Async Workflow Execution#
Async Workflow Execution in DBOS Python enables developers to write durable, resilient workflows using Python's native async/await syntax. This feature allows workflows to perform non-blocking I/O-bound operations while maintaining DBOS's guarantees of durability, idempotency, and automatic recovery.
DBOS Python provides seamless support for both async and sync workflows through a unified decorator interface. The @DBOS.workflow() decorator automatically handles both execution modes by detecting whether the decorated function is a coroutine using Python's introspection capabilities. The framework uses parallel execution paths for async and sync workflows, with async workflows executed on asyncio event loops with cancellation protection via asyncio.shield(), while sync workflows run in a ThreadPoolExecutor.
Async workflows are particularly useful for I/O-bound operations, concurrent API calls, and scheduled tasks. The framework maintains all durability and recovery guarantees regardless of whether workflows are synchronous or asynchronous.
Architecture and Implementation#
Runtime Detection and Function Introspection#
The critical distinction between sync and async execution happens at line 1164 in _core.py:
return _mark_coroutine(wrapper) if inspect.iscoroutinefunction(func) else wrapper
The framework uses inspect.iscoroutinefunction() throughout the codebase to determine if a function is async, appearing in multiple locations including workflow execution in _get_wf_invoke_func(), step execution in run_step_async(), and step decorator wrapping.
Event Loop Management#
The BackgroundEventLoop class manages async execution for workflows not started within an existing event loop:
def submit_coroutine(self, coro: Coroutine[Any, Any, T]) -> T:
"""Submit a coroutine to the background event loop"""
if self._main_loop is not None and self._main_loop.is_running():
return asyncio.run_coroutine_threadsafe(coro, self._main_loop).result()
if self._loop is None:
raise RuntimeError("Event loop not started")
return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
This is used in _get_wf_invoke_func() at line 492-494 to execute async workflows from synchronous contexts.
Execution Paths#
The framework provides parallel execution paths:
Sync Path: start_workflow() returns WorkflowHandle[R], executes workflows in a thread pool via dbos._executor.submit(), and uses _execute_workflow_wthread() for execution.
Async Path: start_workflow_async() returns WorkflowHandleAsync[R], executes workflows using asyncio.create_task(), and uses _execute_workflow_async() for execution. Tasks are shielded from cancellation: asyncio.shield(asyncio.create_task(coro)).
Database Operation Handling#
Async workflows wrap database calls in asyncio.to_thread() to avoid blocking the event loop:
status, should_execute = await asyncio.to_thread(
_init_workflow,
dbos,
new_wf_ctx,
inputs=inputs,
# ... other parameters
)
Child workflow recording also uses asyncio.to_thread() to prevent blocking:
await asyncio.to_thread(
dbos._sys_db.record_child_workflow,
new_wf_ctx.parent_workflow_id,
new_child_workflow_id,
new_wf_ctx.parent_workflow_fid,
get_dbos_func_name(func),
)
Defining Async Workflows#
Basic Syntax#
Async (coroutine) workflows are defined with async def and decorated with @DBOS.workflow():
@DBOS.step()
async def example_step():
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com") as response:
return await response.text()
@DBOS.workflow()
async def example_workflow(friend: str):
await DBOS.sleep_async(10)
body = await example_step()
result = await asyncio.to_thread(example_transaction, body)
return result
Decorator Parameters#
The @DBOS.workflow() decorator accepts several parameters that work identically for both sync and async workflows:
name: Optional workflow name (defaults to function qualified name)max_recovery_attempts: Number of recovery attempts on failureserialization_type: Controls serialization format (WorkflowSerializationFormat.DEFAULTor.PORTABLE)validate_args: Optional argument validator function
Starting Async Workflows#
# Start async workflow (runs in same event loop as caller)
handle: WorkflowHandleAsync = await DBOS.start_workflow_async(example_workflow, "var1", "var2")
# Enqueue async workflow (runs in different event loop)
await enqueue_async(example_workflow, "input_data")
# Enqueue with delay (workflow enters DELAYED status)
with SetEnqueueOptions(delay_seconds=60.0):
delayed_handle = await enqueue_async(example_workflow, "delayed_data")
Calling a coroutine workflow or starting it with DBOS.start_workflow_async runs it in the same event loop as the caller, while enqueueing with enqueue_async starts the workflow in a different event loop.
Async Steps#
Step Definition#
Async steps are fully supported through the @DBOS.step() decorator:
@DBOS.step(retries_allowed=True, max_attempts=10)
async def example_step():
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com") as response:
return await response.text()
@DBOS.step(preemptible=True)
async def long_running_operation():
# This step can be immediately cancelled if its workflow is cancelled
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com/long-operation") as response:
return await response.text()
Step Options#
Step options are defined in the StepOptions TypedDict:
| Option | Type | Default | Description |
|---|---|---|---|
name | Optional[str] | None | Optional name for the step |
retries_allowed | bool | False | Whether the step should be retried on failure |
interval_seconds | float | 1.0 | Initial delay between retry attempts |
max_attempts | int | 3 | Maximum number of attempts |
backoff_rate | float | 2.0 | Exponential backoff multiplier |
should_retry | Optional[Callable[[BaseException], bool]] | None | Optional predicate called with a raised exception to decide whether the step should be retried. If it returns False, the exception is re-raised immediately without further retries. |
preemptible | bool | False | If True, cancel the async step immediately if its workflow is cancelled. By default, when a workflow is cancelled, step execution is not preempted—the workflow continues running until the current step completes, then is preempted at the start of the next step. Setting preemptible=True changes this behavior so the step (and workflow) is cancelled immediately (or after a short polling interval). Only supported for async steps; attempting to use with sync steps will raise an exception. |
The retry logic applies exponential backoff with a maximum retry interval of 1 hour (3600 seconds).
run_step_async#
The DBOS.run_step_async() method provides dynamic step execution:
@DBOS.workflow()
async def my_async_workflow() -> str:
# Call async function
result1 = await DBOS.run_step_async(
{"name": "async_step", "retries_allowed": True, "max_attempts": 3},
my_async_function,
"param1"
)
# Call sync function in thread pool
result2 = await DBOS.run_step_async(
{"name": "blocking_step"},
my_blocking_function,
"param2"
)
return result1 + result2
The implementation handles both coroutine and regular functions, with sync functions executed via asyncio.to_thread() to avoid blocking the event loop.
Concurrency Patterns#
Parallel Step Execution#
Running async steps in parallel requires starting them in deterministic order:
# Start steps in deterministic order, then await together
tasks = [
asyncio.create_task(step1("arg1")),
asyncio.create_task(step2("arg2")),
asyncio.create_task(step3("arg3")),
asyncio.create_task(step4("arg4")),
]
# Use return_exceptions=True for proper error handling
results = await asyncio.gather(*tasks, return_exceptions=True)
Without return_exceptions=True, gather raises exceptions immediately and stops awaiting remaining tasks, potentially losing errors.
Durable asyncio.wait#
DBOS.asyncio_wait() is a durable implementation of asyncio.wait() that checkpoints which tasks are done versus pending, ensuring deterministic recovery when workflows are interrupted. It supports all the same parameters as the standard library function:
Signature:
DBOS.asyncio_wait(
fs: List[Awaitable[Any]],
*,
timeout: Optional[float] = None,
return_when: str = asyncio.ALL_COMPLETED,
) -> tuple[set[asyncio.Task[Any]], set[asyncio.Task[Any]]]
Parameters:
fs: List of awaitables (tasks, coroutines, or futures) to wait ontimeout: Optional timeout in seconds. If specified, returns after the timeout even if not all tasks are complete.return_when: Controls when the function returns. Options include:asyncio.ALL_COMPLETED(default): Wait for all tasks to completeasyncio.FIRST_COMPLETED: Return as soon as any task completesasyncio.FIRST_EXCEPTION: Return as soon as any task raises an exception
Returns:
A tuple of two sets: (done_tasks, pending_tasks) where done_tasks contains completed tasks and pending_tasks contains tasks that are still running.
Common Use Cases:
Waiting for the first step to complete:
@DBOS.step()
async def fast_step(val: str) -> str:
return val + "_done"
@DBOS.step()
async def slow_step(val: str) -> str:
await asyncio.sleep(10)
return val + "_done"
@DBOS.workflow()
async def race_workflow() -> str:
done, pending = await DBOS.asyncio_wait(
[fast_step("fast"), slow_step("slow")],
return_when=asyncio.FIRST_COMPLETED,
)
# Use the first result
first_result = next(iter(done)).result()
# Wait for remaining tasks to complete
if pending:
await DBOS.asyncio_wait(list(pending))
return first_result
Waiting with a timeout:
@DBOS.workflow()
async def timeout_workflow() -> tuple[list[str], int]:
done, pending = await DBOS.asyncio_wait(
[step1(), step2(), step3()],
timeout=5.0,
)
completed_results = [t.result() for t in done]
pending_count = len(pending)
# Wait for remaining tasks after timeout
if pending:
await DBOS.asyncio_wait(list(pending))
return completed_results, pending_count
Waiting for the first exception:
@DBOS.workflow()
async def exception_handling_workflow() -> None:
done, pending = await DBOS.asyncio_wait(
[risky_step1(), risky_step2(), risky_step3()],
return_when=asyncio.FIRST_EXCEPTION,
)
# Check for exceptions in completed tasks
for task in done:
if task.exception():
# Handle the exception
pass
# Wait for remaining tasks
if pending:
await DBOS.asyncio_wait(list(pending))
Key Differences from asyncio.gather:
Unlike asyncio.gather(), which waits for all tasks to complete and returns their results as a list, DBOS.asyncio_wait() provides more granular control:
- Returns a tuple of
(done, pending)sets instead of a list of results - Supports timeouts and selective completion via
return_whenparameter - Does not automatically raise exceptions—you must check tasks for exceptions explicitly
- Better suited for scenarios where you need to handle partial completion or timeouts
When called outside a workflow, DBOS.asyncio_wait() falls back to the standard asyncio.wait() function.
Child Workflows#
Child workflows can be called directly or started asynchronously:
@DBOS.workflow()
async def parent_workflow() -> tuple[str, str, str]:
parent_id = DBOS.workflow_id
# Direct child call
result = await child_workflow("test")
# Start child workflow asynchronously
child_handle = await dbos.start_workflow_async(child_workflow, "async")
child_result = await child_handle.get_result()
return parent_id, child_handle.workflow_id, result + child_result
Communication Patterns#
Async workflows should use async versions of DBOS context methods:
# Send/receive messages
await DBOS.send_async(destination_id, message, topic)
message = await DBOS.recv_async(topic, timeout_seconds=60)
# Set/get events
await DBOS.set_event_async(key, value)
value = await DBOS.get_event_async(workflow_id, key)
# Sleep
await DBOS.sleep_async(10)
Idempotent Message Delivery#
Both send() and send_async() support an optional idempotency_key parameter for idempotent message delivery. This parameter ensures that duplicate sends with the same key are ignored:
# With idempotency key - prevents duplicate delivery
DBOS.send(
destination_id,
message,
topic,
idempotency_key="unique-key-123"
)
# Async version with idempotency key
await DBOS.send_async(
destination_id,
message,
topic,
idempotency_key="unique-key-456"
)
The idempotency_key parameter can be used in any context: from workflows, from steps, or from outside workflow contexts. When called from within a workflow, the workflow's own idempotency mechanism already handles deduplication, but an explicit idempotency_key can still be provided for additional control.
Waiting for Multiple Workflows#
DBOS.wait_first() and DBOS.wait_first_async() enable waiting for any one of multiple workflows to complete. These methods poll the database until at least one workflow's status is no longer PENDING, ENQUEUED, or DELAYED, then return the corresponding handle. This is useful for patterns like racing workflows, implementing timeouts, or processing results as they become available.
Basic Usage#
@DBOS.workflow()
async def coordinator_workflow() -> str:
# Start multiple child workflows
handles = [
await DBOS.start_workflow_async(task_workflow, i)
for i in range(3)
]
# Wait for first to complete
first_completed = await DBOS.wait_first_async(handles)
return await first_completed.get_result()
Processing Tasks as They Complete#
wait_first() can be used in a loop to process workflow results in completion order rather than start order:
@DBOS.workflow()
def process_tasks() -> List[str]:
# Start multiple concurrent tasks
handles = [queue.enqueue(process_task, i) for i in range(5)]
# Process results as tasks complete (not in start order)
results = []
remaining = list(handles)
while remaining:
completed = DBOS.wait_first(remaining)
results.append(completed.get_result())
remaining = [h for h in remaining if h.workflow_id != completed.workflow_id]
return results
Parameters and Error Handling#
Both methods accept the following parameters:
handles: List of workflow handles to wait on. Must not be empty and must not contain duplicate workflow IDs.polling_interval_sec: Optional polling interval for checking workflow completion status (defaults toDEFAULT_POLLING_INTERVAL).
Error Handling:
- Raises
ValueErrorif the handles list is empty - Raises
ValueErrorif the handles list contains duplicate workflow IDs
Transactions and Async Limitations#
Transaction Limitation#
Transactions explicitly do not support async functions. The code raises a DBOSException when a coroutine function is decorated with @DBOS.transaction():
if inspect.iscoroutinefunction(func):
raise DBOSException(
f"Function {transaction_name} is a coroutine function, but DBOS.transaction does not support coroutine functions"
)
Technical Reasons:
- DBOS transactions use SQLAlchemy's synchronous session API, which requires blocking I/O operations
- Transactions must set isolation levels and manage connection state synchronously
- Complex retry logic for serialization failures would be more difficult to manage across async boundaries
Calling Transactions from Async Contexts#
When calling synchronous transactions from async contexts (such as async workflows), you should use asyncio.to_thread() to avoid blocking the event loop. If a transaction is called directly while an event loop is running, DBOS will log a warning to help identify this pattern:
@DBOS.transaction()
def my_db_transaction() -> str:
rows = DBOS.sql_session.execute(sa.text("SELECT 1")).fetchall()
return str(rows[0][0])
@DBOS.workflow()
async def async_workflow() -> str:
# Recommended: Use asyncio.to_thread() to avoid blocking the event loop
result = await asyncio.to_thread(my_db_transaction)
return result
Warning Behavior:
If you call a transaction directly from an async context without using asyncio.to_thread(), DBOS will log a warning message:
Transaction {transaction_name} was called while an event loop is running. Invoke transactions from an async context using asyncio.to_thread to avoid blocking the event loop.
This warning helps developers identify potentially problematic patterns where synchronous transactions may block the event loop. While DBOS can handle the sync/async boundary internally, using asyncio.to_thread() is the recommended approach for proper async execution.
Integration Examples#
FastAPI Integration#
FastAPI integration example shows how to combine async workflows with HTTP endpoints:
@asynccontextmanager
async def lifespan(app: FastAPI) -> Any:
nonlocal resource
resource = 1
yield
resource = None
app = FastAPI(lifespan=lifespan)
DBOS(fastapi=app, config=config)
queue = Queue("queue")
@app.get("/")
@DBOS.workflow()
async def resource_workflow() -> Any:
handle = await queue.enqueue_async(queue_workflow)
return {
"resource": resource,
"loop": id(asyncio.get_event_loop()),
"queue_loop": await handle.get_result(),
}
Queue Integration#
Queue operations with async workflows enable background processing:
@DBOS.workflow()
async def test_workflow(var1: str, var2: str) -> str:
var1 = await test_step(var1)
return var1 + var2
@DBOS.step()
async def test_step(var: str) -> str:
return var + "d"
queue = await DBOS.register_queue_async("test_queue")
# Immediate enqueueing
with SetWorkflowID(wfid):
handle = await queue.enqueue_async(test_workflow, "abc", "123")
assert (await handle.get_result()) == "abcd123"
# Delayed enqueueing - workflow enters DELAYED status
with SetEnqueueOptions(delay_seconds=10.0):
delayed_handle = await queue.enqueue_async(test_workflow, "xyz", "789")
# Workflow transitions to ENQUEUED after 10 seconds, then executes
assert (await delayed_handle.get_result()) == "xyzd789"
Async Queue Management#
DBOS provides async versions of queue management methods for use in async contexts:
Queue Registration, Retrieval, Deletion, and Listing:
DBOS.register_queue_async()- async version ofregister_queue()DBOS.retrieve_queue_async()- async version ofretrieve_queue()DBOS.delete_queue_async()- async version ofdelete_queue()DBOS.list_queues()- List all database-backed queues registered in the system database. Returns a list of Queue objects.DBOS.list_queues_async()- async version oflist_queues()DBOSClient.register_queue_async(),retrieve_queue_async(),delete_queue_async(),list_queues(),list_queues_async()- corresponding client methods
Queue Property Getters and Setters:
Async getters:
queue.get_concurrency_async()queue.get_worker_concurrency_async()queue.get_limiter_async()queue.get_priority_enabled_async()queue.get_partition_queue_async()queue.get_polling_interval_sec_async()
Async setters:
queue.set_concurrency_async()queue.set_worker_concurrency_async()queue.set_limiter_async()queue.set_priority_enabled_async()queue.set_partition_queue_async()queue.set_polling_interval_sec_async()
Usage from Async Contexts:
When calling queue management methods from async code, use the async variants to avoid blocking the event loop. The synchronous methods issue warnings (or may raise errors) when called from a running asyncio event loop:
# In async code, use async methods
queue = await DBOS.register_queue_async(
"my_queue",
concurrency=10,
polling_interval_sec=0.5
)
# Get and set queue properties asynchronously
current_concurrency = await queue.get_concurrency_async()
await queue.set_concurrency_async(15)
# Clean up when done
await DBOS.delete_queue_async("my_queue")
The queue.enqueue_async() method was already available and continues to work as before for enqueueing workflows asynchronously.
Delayed Workflow Enqueueing#
Workflows can be enqueued with a delay using the delay_seconds parameter in SetEnqueueOptions. When a workflow is enqueued with a delay:
- DELAYED Status: The workflow enters the
DELAYEDstatus rather thanENQUEUED - Not Immediately Available: The workflow is not available for dequeue until the delay expires
- Automatic Transition: After the delay period, the workflow automatically transitions to
ENQUEUEDstatus - Queue Processing: Once
ENQUEUED, the workflow can be dequeued and executed normally
Example:
queue = Queue("my_queue")
@DBOS.workflow()
async def delayed_task(data: str) -> str:
return f"Processed: {data}"
# Enqueue workflow to execute after 60 seconds
with SetEnqueueOptions(delay_seconds=60.0):
handle = await queue.enqueue_async(delayed_task, "important_data")
# Workflow is in DELAYED status
status = handle.get_status()
assert status.status == "DELAYED"
assert status.delay_until_epoch_ms is not None
# After 60 seconds, workflow transitions to ENQUEUED and executes
result = await handle.get_result()
assert result == "Processed: important_data"
Delayed Workflow Behavior:
- Visibility: Delayed workflows are visible in
list_workflows()andlist_queued_workflows()results - Cancellation: Delayed workflows can be cancelled before execution using
cancel_workflow()orcancel_workflow_async() - Resume: If a delayed workflow is cancelled, it can be resumed with
resume_workflow(), which transitions it immediately toENQUEUED(bypassing the original delay) - wait_first(): Delayed workflows are treated as active workflows and will unblock
wait_first()when they complete - Deduplication: Delayed workflows participate in deduplication—attempting to enqueue a duplicate workflow with the same deduplication ID while one is in
DELAYEDstatus will fail
Modifying Workflow Delays#
You can modify the delay on a DELAYED or ENQUEUED workflow using DBOS.set_workflow_delay() (sync) or DBOS.set_workflow_delay_async() (async). The method accepts the workflow_id and either delay_seconds (relative delay from now) or delay_until_epoch_ms (absolute timestamp in milliseconds). You must provide exactly one of these parameters, not both. The method only affects workflows in DELAYED or ENQUEUED status.
# Example: Enqueue with a long delay, then shorten it
with SetEnqueueOptions(delay_seconds=600.0):
handle = queue.enqueue(my_workflow)
# Shorten the delay to 5 seconds
DBOS.set_workflow_delay(handle.workflow_id, delay_seconds=5.0)
Background Workflow Execution#
From the Hacker News Research Agent example, demonstrating how to start durable background agents and query their status:
@app.post("/agents")
def start_agent(request: AgentStartRequest):
# Start a durable agent in the background
DBOS.start_workflow(agentic_research_workflow, request.topic)
return {"ok": True}
@app.get("/agents", response_model=list[AgentStatus])
async def list_agents():
# List all active agents
agent_workflows = await DBOS.list_workflows_async(
name=agentic_research_workflow.__qualname__,
sort_desc=True,
)
# Retrieve their statuses concurrently
statuses = await asyncio.gather(
*[DBOS.get_event_async(w.workflow_id, AGENT_STATUS) for w in agent_workflows]
)
return statuses
Scheduled Workflows#
Async Scheduled Workflows#
Async scheduled workflows are supported, as introduced in PR #493 (October 20, 2025). This enables non-blocking scheduled operations:
@DBOS.workflow()
async def my_async_workflow(scheduled_time: datetime, context: Any) -> None:
# Workflow logic here
pass
# Schedule at runtime
DBOS.create_schedule(
schedule_name="my-schedule",
workflow_fn=my_async_workflow,
schedule="*/5 * * * * *", # Cron expression with seconds support
context={"env": "production"},
automatic_backfill=True, # Backfill missed executions on startup
cron_timezone="America/New_York", # Evaluate cron in specific timezone
)
Scheduled vs. Delayed Workflows#
DBOS Python provides two distinct mechanisms for deferred workflow execution:
Delayed Enqueueing (via delay_seconds):
- Use Case: One-time delayed execution of a workflow
- Configuration: Specify
delay_secondsinSetEnqueueOptionsorEnqueueOptions - Status: Workflows enter
DELAYEDstatus and automatically transition toENQUEUEDafter the delay - Persistence: The delay is recorded with the workflow instance; once executed, it's complete
- Example: Retry a failed operation after 5 minutes, send a reminder email 1 hour from now
Scheduled Workflows (via cron expressions):
- Use Case: Recurring execution on a schedule
- Configuration: Use
create_schedule()with a cron expression - Status: Each scheduled invocation creates a new workflow instance
- Persistence: The schedule persists in the database and continues firing until explicitly deleted
- Example: Run a nightly cleanup job, send daily reports, poll an API every 15 minutes
When to Use Each:
- Use delayed enqueueing when you need to execute a specific workflow instance after a delay (e.g., implementing exponential backoff, scheduling a one-time reminder)
- Use scheduled workflows when you need recurring, time-based execution (e.g., periodic maintenance, regular data synchronization, daily batch processing)
Schedule Configuration#
The create_schedule() and create_schedule_async() methods accept several parameters:
schedule_name: Unique name for the scheduleworkflow_fn: The workflow function to invoke (must accept(datetime, context))schedule: Cron expression with seconds support (6 fields)context: Optional context object passed to every invocation (defaults toNone)automatic_backfill: Optional boolean (defaults toFalse). When enabled, the scheduler automatically backfills missed executions on startup based on when the schedule last fired.cron_timezone: Optional IANA timezone name (e.g.,"America/New_York","Europe/London","Asia/Tokyo"). When specified, the cron expression is evaluated in that timezone instead of UTC (the default).
Last Fired Tracking#
The scheduler tracks when each schedule last executed successfully via the last_fired_at field. This timestamp enables the automatic backfill feature and helps with debugging and monitoring scheduled workflows. After each successful execution, last_fired_at is updated to the scheduled execution time.
Timezone-Aware Scheduling#
While UTC remains the default timezone for cron evaluation, the cron_timezone parameter allows async workflows to be scheduled according to local business hours in specific regions. This is especially useful for workflows that interact with time-sensitive external systems or services in specific geographical locations.
For example, a workflow that needs to run at midnight Eastern Time every day can be configured as:
DBOS.create_schedule(
schedule_name="daily-midnight-eastern",
workflow_fn=daily_task,
schedule="0 0 * * *", # Midnight
cron_timezone="America/New_York",
)
The scheduler will automatically handle daylight saving time transitions for the specified timezone.
Automatic Backfill#
When automatic_backfill is enabled, the scheduler will automatically enqueue any schedule instances that were missed while the service was down. On startup, the scheduler checks the last_fired_at timestamp for each schedule with automatic backfill enabled. If last_fired_at is set and is in the past, the scheduler backfills all missed executions between that time and the current time before resuming normal scheduled operation.
This ensures that scheduled workflows don't miss important executions due to service downtime or restarts.
Management APIs#
Dynamic scheduling provides extensive management capabilities:
# List all schedules
schedules = DBOS.list_schedules()
# Pause/resume schedules
DBOS.pause_schedule("my-schedule")
DBOS.resume_schedule("my-schedule")
# Trigger immediate execution
DBOS.trigger_schedule("my-schedule")
# Delete a schedule
DBOS.delete_schedule("my-schedule")
# Backfill missed executions
DBOS.backfill_schedule(
schedule_name="my-schedule",
start_time=datetime(2026, 1, 1),
end_time=datetime(2026, 1, 31),
)
Best Practices#
Use Async Context Methods#
Coroutine workflows should always use async versions of DBOS context methods:
DBOS.sleep_async()instead ofDBOS.sleep()DBOS.send_async()instead ofDBOS.send()DBOS.recv_async()instead ofDBOS.recv()DBOS.start_workflow_async()instead ofDBOS.start_workflow()
Maintain Determinism#
Even async workflows must be deterministic. All non-deterministic operations (database access, API calls, random numbers, time) should be performed in steps, not directly in workflow functions.
Error Handling with asyncio.gather#
Always specify return_exceptions=True when using asyncio.gather():
results = await asyncio.gather(*tasks, return_exceptions=True)
Handle Synchronous Code#
Use asyncio.to_thread() to run synchronous functions from async workflows:
result = await asyncio.to_thread(sync_function, arg1, arg2)
Deterministic Ordering for Parallel Steps#
When running async steps in parallel, they must be started in a well-defined sequence before awaiting. Non-deterministic ordering (where execution depends on timing of previous steps) is not allowed.
Recovery and Error Handling#
Automatic Recovery#
If an async workflow is interrupted, it automatically resumes from the last completed step upon restart, maintaining the same recovery guarantees as synchronous workflows.
Workflow recovery is demonstrated in tests:
@DBOS.workflow()
async def test_workflow(var: str, var2: str) -> str:
output = await test_step(var2)
return var
# Execute workflow
with SetWorkflowID(workflow_id):
assert (await test_workflow(value, value)) == value
# Simulate crash
# ... (set status to PENDING in database)
# Recover workflow
handles = DBOS._recover_pending_workflows()
assert recovered_result
Timeout Handling#
@DBOS.workflow()
async def blocked_workflow() -> None:
while True:
await DBOS.sleep_async(0.1)
with SetWorkflowTimeout(0.1):
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
await blocked_workflow()
Task Cancellation Protection#
Task cancellation protection ensures workflows complete despite caller cancellation:
@DBOS.workflow()
async def test_workflow(duration: float) -> str:
await DBOS.sleep_async(duration)
return "completed"
async def run_workflow_task() -> str:
with SetWorkflowID(wfid):
handle = await DBOS.start_workflow_async(test_workflow, 1.0)
return await handle.get_result()
task = asyncio.create_task(run_workflow_task())
task.cancel() # Cancel the task
# Workflow completes despite task cancellation
handle = await DBOS.retrieve_workflow_async(wfid)
assert await handle.get_result() == "completed"
Workflow Handle Resilience After Cancellation#
When a workflow is cancelled and later resumed, the original workflow handle automatically reflects the resumed workflow's execution. Both WorkflowHandle.get_result() and WorkflowHandleAsync.get_result() catch cancellation exceptions (DBOSWorkflowCancelledError and DBOSAwaitedWorkflowCancelledError) and automatically poll the system database to retrieve the workflow's final result. If the workflow was resumed after cancellation, the handle returns the result of the resumed execution instead of raising a cancellation exception.
Implementation Details:
When get_result() encounters a cancellation exception from the underlying future or task, it catches the exception and invokes await_workflow_result() (for sync handles) or await_workflow_result_async() (for async handles) to check the database for the workflow's current status. This allows the handle to transparently recover when workflows are cancelled and then resumed.
This behavior makes workflow handles more resilient—they reflect the eventual state of the workflow, including any resumptions, rather than just the initial execution attempt:
@DBOS.workflow()
async def simple_workflow(x: int) -> int:
await step_one(x)
await step_two(x)
return x
# Start workflow and cancel it
wfid = str(uuid.uuid4())
with SetWorkflowID(wfid):
cancelled_handle = await DBOS.start_workflow_async(simple_workflow, 42)
await DBOS.cancel_workflow_async(wfid)
# First get_result() call will raise DBOSAwaitedWorkflowCancelledError
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
await cancelled_handle.get_result()
# Resume the workflow
resume_handle = await DBOS.resume_workflow_async(wfid)
assert (await resume_handle.get_result()) == 42
# The original cancelled_handle now returns the resumed workflow's result
# by catching the cancellation error and checking the database
assert (await cancelled_handle.get_result()) == 42
Note: A cancellation exception (DBOSWorkflowCancelledError or DBOSAwaitedWorkflowCancelledError) is only raised if the workflow remains cancelled and is not subsequently resumed.
Limitations and Considerations#
-
Transactions Don't Support Async: At this time, DBOS does not support coroutine transactions. Decorating an
async deffunction with@DBOS.transactionwill raise an error at runtime. -
Event Loop Management: Calling a coroutine workflow or starting it with
DBOS.start_workflow_asyncruns it in the same event loop as the caller. Enqueueing withenqueue_asyncstarts the workflow in a different event loop. -
Deterministic Ordering Required: When running async steps in parallel, they must be started in a well-defined sequence before awaiting. Non-deterministic ordering (where execution depends on timing of previous steps) is not allowed.
-
Serialization Requirements: Like synchronous workflows, async workflow inputs/outputs and step outputs must be serializable (default is pickle, customizable).
-
Schedule Persistence: Removing code decorators does not stop dynamic schedules—they persist in the database. You must explicitly call
DBOS.delete_schedule()to remove a schedule.
Async Context Methods Reference#
DBOS provides async versions of most workflow management methods, including:
start_workflow_asyncsend_async/recv_asyncset_event_async/get_event_asyncsleep_asyncretrieve_workflow_asynclist_workflows_asyncwrite_stream_async/read_stream_asyncwait_first_asynccancel_workflow_async/cancel_workflows_asyncresume_workflow_async/resume_workflows_asyncdelete_workflow_async/delete_workflows_asyncfork_workflow_asyncset_workflow_delay_async
All async context methods return coroutines that must be awaited in async functions.
send / send_async#
Signature (async):
DBOS.send_async(
destination_id: str,
message: Any,
topic: Optional[str] = None,
*,
idempotency_key: Optional[str] = None,
serialization_type: Optional[WorkflowSerializationFormat] = WorkflowSerializationFormat.DEFAULT
) -> None
Signature (sync):
DBOS.send(
destination_id: str,
message: Any,
topic: Optional[str] = None,
*,
idempotency_key: Optional[str] = None,
serialization_type: Optional[WorkflowSerializationFormat] = WorkflowSerializationFormat.DEFAULT
) -> None
Send a message to a workflow identified by destination_id. The message can optionally be associated with a topic for filtered receiving. Messages can be sent from workflows, steps, or outside workflow contexts.
Parameters:
destination_id: The workflow ID to send the message to.message: The message payload to send.topic: Optional topic string for message filtering. If not specified, uses a default topic.idempotency_key: Optional unique identifier for idempotent message delivery. If provided, duplicate sends with the same key are ignored. Messages are deduplicated based on themessage_uuid(which is set toidempotency_keyif provided, or auto-generated otherwise).serialization_type: Optional serialization format for the message.
Implementation Details:
When called from within a workflow context, send() records the message as a workflow step, leveraging the workflow's built-in idempotency guarantees. When called outside a workflow context (e.g., from a non-workflow function or external code), send() directly inserts the message into the database with an idempotent constraint on message_uuid.
Prior to this implementation, send() calls outside workflow contexts created temporary workflows, but this approach has been removed in favor of direct database insertion. Messages are now marked as "consumed" rather than deleted when received, enabling the use of message_uuid as an idempotency key.
wait_first / wait_first_async#
Signature (async):
DBOS.wait_first_async(
handles: List[WorkflowHandleAsync[Any]],
*,
polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
) -> WorkflowHandleAsync[Any]
Signature (sync):
DBOS.wait_first(
handles: List[WorkflowHandle[Any]],
*,
polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
) -> WorkflowHandle[Any]
Wait for any one of the given workflow handles to complete and return the first handle that completes. These methods poll the database until at least one workflow's status is no longer PENDING, ENQUEUED, or DELAYED, then return the corresponding handle.
Parameters:
handles: List of workflow handles to wait on. Must not be empty and must not contain duplicate workflow IDs.polling_interval_sec: Optional polling interval for checking workflow completion status. Defaults toDEFAULT_POLLING_INTERVAL.
Returns:
The first workflow handle that completes.
Raises:
ValueError if the handles list is empty or contains duplicate workflow IDs.
Example - Racing workflows:
@DBOS.workflow()
async def fetch_from_api_a() -> str:
# Fetch data from API A
return await some_async_call()
@DBOS.workflow()
async def fetch_from_api_b() -> str:
# Fetch data from API B
return await another_async_call()
# Start both workflows concurrently
handle_a = await DBOS.start_workflow_async(fetch_from_api_a)
handle_b = await DBOS.start_workflow_async(fetch_from_api_b)
# Use whichever completes first
first_completed = await DBOS.wait_first_async([handle_a, handle_b])
result = await first_completed.get_result()
Example - Processing tasks as they complete:
@DBOS.workflow()
def process_tasks() -> List[str]:
# Start multiple concurrent tasks
handles = [queue.enqueue(process_task, i) for i in range(5)]
# Process results as tasks complete (not in start order)
results = []
remaining = list(handles)
while remaining:
completed = DBOS.wait_first(remaining)
results.append(completed.get_result())
remaining = [h for h in remaining if h.workflow_id != completed.workflow_id]
return results
cancel_workflow / cancel_workflow_async#
Signature (async):
DBOS.cancel_workflow_async(workflow_id: str) -> None
Signature (sync):
DBOS.cancel_workflow(workflow_id: str) -> None
Cancel a workflow by ID. The workflow's status is set to CANCELLED and it is removed from any queue, but only if the workflow is not already complete.
Parameters:
workflow_id: The workflow ID to cancel.
Example:
# Synchronous cancellation
DBOS.cancel_workflow(workflow_id)
# Asynchronous cancellation
await DBOS.cancel_workflow_async(workflow_id)
Cancellation Behavior:
By default, when a workflow is cancelled, step execution is not preempted. The workflow continues running until its current step completes, then is preempted at the start of the next step. For async steps that may run for an extended duration, you can use the preemptible option to change this behavior:
@DBOS.step(preemptible=True)
async def long_running_step():
# This step will be cancelled immediately (or after a short polling interval)
# if its workflow is cancelled
await asyncio.sleep(60)
return "done"
When preemptible=True, the step and workflow are cancelled immediately (or after a short polling interval) when the workflow is cancelled, rather than waiting for the step to complete.
cancel_workflows / cancel_workflows_async#
Signature (async):
DBOS.cancel_workflows_async(workflow_ids: List[str]) -> None
Signature (sync):
DBOS.cancel_workflows(workflow_ids: List[str]) -> None
Cancel multiple workflows at once by providing a list of workflow IDs. This is more efficient than calling cancel_workflow() repeatedly for each workflow. All specified workflows will have their status set to CANCELLED and be removed from any queues, but only if they are not already complete.
Parameters:
workflow_ids: List of workflow IDs to cancel.
Example:
# Cancel multiple workflows efficiently
workflow_ids = [wf1_id, wf2_id, wf3_id]
DBOS.cancel_workflows(workflow_ids)
# Async version
await DBOS.cancel_workflows_async(workflow_ids)
resume_workflow / resume_workflow_async#
Signature (async):
DBOS.resume_workflow_async(
workflow_id: str,
*,
queue_name: Optional[str] = None,
) -> WorkflowHandleAsync[Any]
Signature (sync):
DBOS.resume_workflow(
workflow_id: str,
*,
queue_name: Optional[str] = None,
) -> WorkflowHandle[Any]
Resume a cancelled workflow by ID. The workflow's status is set to ENQUEUED and its recovery attempts and deadline are cleared, but only if the workflow is not already complete. Returns a handle to the resumed workflow.
Parameters:
workflow_id: The workflow ID to resume.queue_name: Optional queue name to enqueue the resumed workflow to. If not specified, the workflow is enqueued to the internal queue.
Returns:
A workflow handle for the resumed workflow.
Example:
# Resume a cancelled workflow to the internal queue
handle = DBOS.resume_workflow(workflow_id)
result = handle.get_result()
# Resume to a specific queue
handle = DBOS.resume_workflow(workflow_id, queue_name="my_queue")
result = handle.get_result()
# Async version
handle = await DBOS.resume_workflow_async(workflow_id, queue_name="my_queue")
result = await handle.get_result()
resume_workflows / resume_workflows_async#
Signature (async):
DBOS.resume_workflows_async(
workflow_ids: List[str],
*,
queue_name: Optional[str] = None,
) -> List[WorkflowHandleAsync[Any]]
Signature (sync):
DBOS.resume_workflows(
workflow_ids: List[str],
*,
queue_name: Optional[str] = None,
) -> List[WorkflowHandle[Any]]
Resume multiple cancelled workflows at once by providing a list of workflow IDs. This is more efficient than calling resume_workflow() repeatedly for each workflow. All specified workflows will have their status set to ENQUEUED and their recovery attempts and deadline cleared, but only if they are not already complete. Returns a list of handles corresponding to the resumed workflows.
Parameters:
workflow_ids: List of workflow IDs to resume.queue_name: Optional queue name to enqueue the resumed workflows to. If not specified, workflows are enqueued to the internal queue.
Returns:
A list of workflow handles for the resumed workflows, in the same order as the input workflow IDs.
Example:
# Resume multiple workflows to the internal queue
workflow_ids = [wf1_id, wf2_id, wf3_id]
handles = DBOS.resume_workflows(workflow_ids)
results = [h.get_result() for h in handles]
# Resume to a specific queue
handles = DBOS.resume_workflows(workflow_ids, queue_name="my_queue")
results = [h.get_result() for h in handles]
# Async version
handles = await DBOS.resume_workflows_async(workflow_ids, queue_name="my_queue")
results = [await h.get_result() for h in handles]
delete_workflow / delete_workflow_async#
Signature (async):
DBOS.delete_workflow_async(
workflow_id: str,
*,
delete_children: bool = False
) -> None
Signature (sync):
DBOS.delete_workflow(
workflow_id: str,
*,
delete_children: bool = False
) -> None
Delete a workflow and all its associated data by ID. If delete_children is True, also recursively deletes all child workflows.
Parameters:
workflow_id: The workflow ID to delete.delete_children: If True, also delete all child workflows recursively. Defaults to False.
Example:
# Delete a workflow and all its children
DBOS.delete_workflow(workflow_id, delete_children=True)
# Async version
await DBOS.delete_workflow_async(workflow_id, delete_children=True)
delete_workflows / delete_workflows_async#
Signature (async):
DBOS.delete_workflows_async(
workflow_ids: List[str],
*,
delete_children: bool = False
) -> None
Signature (sync):
DBOS.delete_workflows(
workflow_ids: List[str],
*,
delete_children: bool = False
) -> None
Delete multiple workflows and all their associated data at once by providing a list of workflow IDs. This is more efficient than calling delete_workflow() repeatedly for each workflow. If delete_children is True, also recursively deletes all child workflows of each specified workflow.
Parameters:
workflow_ids: List of workflow IDs to delete.delete_children: If True, also delete all child workflows of each specified workflow recursively. Defaults to False.
Example:
# Delete multiple workflows efficiently
workflow_ids = [wf1_id, wf2_id, wf3_id]
DBOS.delete_workflows(workflow_ids)
# Delete workflows and all their children
DBOS.delete_workflows(workflow_ids, delete_children=True)
# Async version
await DBOS.delete_workflows_async(workflow_ids, delete_children=True)
fork_workflow / fork_workflow_async#
Signature (async):
DBOS.fork_workflow_async(
workflow_id: str,
start_step: int,
*,
application_version: Optional[str] = None,
queue_name: Optional[str] = None,
queue_partition_key: Optional[str] = None,
) -> WorkflowHandleAsync[Any]
Signature (sync):
DBOS.fork_workflow(
workflow_id: str,
start_step: int,
*,
application_version: Optional[str] = None,
queue_name: Optional[str] = None,
queue_partition_key: Optional[str] = None,
) -> WorkflowHandle[Any]
Restart a workflow with a new workflow ID from a specific step. This creates a new workflow instance that begins execution from the specified step, using the original workflow's inputs and state.
Parameters:
workflow_id: The original workflow ID to fork from.start_step: The step number from which to start the forked workflow.application_version: Optional application version for the forked workflow. If not specified, uses the current application version.queue_name: Optional queue name to enqueue the forked workflow to. If not specified, the workflow is enqueued to the internal queue.queue_partition_key: Optional partition key for the forked workflow within the specified queue. Only used ifqueue_nameis also specified.
Returns:
A workflow handle for the forked workflow.
Example:
# Fork a workflow from step 2 to the internal queue
forked_handle = DBOS.fork_workflow(original_workflow_id, 2)
result = forked_handle.get_result()
# Fork to a specific queue with a partition key
forked_handle = DBOS.fork_workflow(
original_workflow_id,
2,
queue_name="my_queue",
queue_partition_key="partition_1"
)
result = forked_handle.get_result()
# Async version
forked_handle = await DBOS.fork_workflow_async(
original_workflow_id,
2,
queue_name="my_queue",
queue_partition_key="partition_1"
)
result = await forked_handle.get_result()
set_workflow_delay / set_workflow_delay_async#
Signature (async):
DBOS.set_workflow_delay_async(
workflow_id: str,
*,
delay_seconds: Optional[float] = None,
delay_until_epoch_ms: Optional[int] = None,
) -> None
Signature (sync):
DBOS.set_workflow_delay(
workflow_id: str,
*,
delay_seconds: Optional[float] = None,
delay_until_epoch_ms: Optional[int] = None,
) -> None
Set or update the delay on a workflow. Provide exactly one of delay_seconds (relative) or delay_until_epoch_ms (absolute). Only affects DELAYED or ENQUEUED workflows.
Parameters:
workflow_id: The workflow ID whose delay should be modified.delay_seconds: Optional relative delay in seconds from now. If specified, the workflow will be delayed untilcurrent_time + delay_seconds.delay_until_epoch_ms: Optional absolute timestamp in milliseconds since epoch. If specified, the workflow will be delayed until this exact time.
Returns:
None
Example:
# Enqueue with a long delay, then shorten it
with SetEnqueueOptions(delay_seconds=600.0):
handle = queue.enqueue(my_workflow)
# Shorten the delay to 5 seconds using relative time
DBOS.set_workflow_delay(handle.workflow_id, delay_seconds=5.0)
# Or set an absolute timestamp (500ms from now)
soon = int(time.time() * 1000) + 500
DBOS.set_workflow_delay(handle.workflow_id, delay_until_epoch_ms=soon)
# Async version
await DBOS.set_workflow_delay_async(handle.workflow_id, delay_seconds=5.0)
Relevant Code Files#
| File | Description | URL |
|---|---|---|
dbos/_core.py | Core workflow and step execution logic, including async/sync detection and execution paths | View |
dbos/_dbos.py | Main DBOS class with workflow, step, and transaction decorators | View |
dbos/_event_loop.py | BackgroundEventLoop implementation for managing async execution | View |
dbos/_scheduler.py | Dynamic scheduler implementation for scheduled workflows | View |
tests/test_async.py | Comprehensive test suite for async workflow patterns | View |
tests/test_concurrency.py | Tests for concurrent async operations and patterns | View |
Related Topics#
- Workflow Durability and Recovery: How DBOS maintains workflow state and recovers from failures
- Workflow Scheduling: Cron-based scheduling of workflows with async support
- Queues: Enqueuing workflows for background execution
- Steps: Reusable operations within workflows with retry logic
- Transactions: Database transactions for ACID guarantees (sync-only)