System Database Schema#
Lead Section#
The System Database Schema in DBOS Transact Python is a comprehensive database design that provides persistent storage for workflow execution state, schedules, and metadata. This schema enables durable workflow execution with recovery capabilities, allowing workflows to survive application restarts and failures.
The system database consists of ten core tables stored in the "dbos" schema (PostgreSQL) or directly in the database (SQLite). These tables track workflow execution status, operation outputs, notifications, events, streams, scheduled workflows, application versions, and queue configurations. The schema is maintained through a versioned migration system that automatically applies incremental updates during DBOS initialization.
DBOS maintains a clear separation between the system database (framework metadata) and the application database (user data), each with its own connection URL and schema. The system database supports both PostgreSQL and SQLite backends, with PostgreSQL recommended for production use.
Core Tables#
workflow_status#
The workflow_status table is the central table for tracking all workflow executions. It stores comprehensive metadata about each workflow's lifecycle, execution state, and relationships.
Primary Key: workflow_uuid (TEXT)
Core Columns:
status(TEXT) - Current workflow status: PENDING, SUCCESS, ERROR, CANCELLED, ENQUEUED, DELAYED, MAX_RECOVERY_ATTEMPTS_EXCEEDEDname(TEXT) - Fully-qualified workflow function nameinputs(TEXT) - Serialized workflow input argumentsoutput(TEXT) - Serialized workflow return valueerror(TEXT) - Serialized error information if workflow failed
Authentication & Identity:
authenticated_user(TEXT) - User who initiated the workflowassumed_role(TEXT) - Role assumed for executionauthenticated_roles(TEXT) - Available roles for the user
Execution Tracking:
executor_id(TEXT) - Identifier of the executor instance running the workflowcreated_at(BIGINT) - Timestamp in milliseconds when workflow was createdupdated_at(BIGINT) - Last update timestampstarted_at_epoch_ms(BIGINT) - When workflow execution actually started
Application Context:
application_version(TEXT) - Version of the application. Automatically computed from workflow source code (MD5 hash), or defaults to 'DEFAULT_VERSION' if source code inspection fails. Can be manually set via theapplication_versionfield inDBOSConfig.application_id(TEXT) - Application identifierclass_name(VARCHAR(255)) - Class name for static class method workflowsconfig_name(VARCHAR(255)) - Configuration name
Workflow Relationships:
parent_workflow_id(TEXT) - Parent workflow UUID for child workflowsforked_from(TEXT) - UUID of workflow this was forked fromwas_forked_from(BOOLEAN, NOT NULL, DEFAULT FALSE) - Whether this workflow has been forked from by another workflow. This is true if the workflow was ever forked from, and false otherwise.owner_xid(TEXT) - Transaction identifier
Queue & Recovery:
queue_name(TEXT) - Queue name for queued workflowsdeduplication_id(TEXT) - For queue deduplicationpriority(INTEGER) - Execution priority (default 0)queue_partition_key(TEXT) - Partition key for queue distributionrecovery_attempts(BIGINT) - Number of recovery attempts (default 0)delay_until_epoch_ms(BIGINT, nullable) - Epoch timestamp in milliseconds until which a workflow should remain in DELAYED status. When the current time exceeds this value, the workflow transitions from DELAYED to ENQUEUED. NULL for workflows without a delay.
Timeout Management:
workflow_timeout_ms(BIGINT) - Workflow timeout in millisecondsworkflow_deadline_epoch_ms(BIGINT) - Absolute deadline timestamp
Serialization:
serialization(TEXT) - Serialization format metadata
Indexes:
workflow_status_created_at_indexoncreated_at- For time-based queriesworkflow_status_executor_id_indexonexecutor_id- For executor-specific queriesworkflow_status_status_indexonstatus- For filtering by statusidx_workflow_status_queue_status_startedon(queue_name, status, started_at_epoch_ms)- For queue operationsidx_workflow_status_forked_fromonforked_from- partial index WHERE forked_from IS NOT NULL - For tracking forked workflowsidx_workflow_status_parent_workflow_idonparent_workflow_id- partial index WHERE parent_workflow_id IS NOT NULL - For parent-child relationshipsidx_workflow_status_delayedondelay_until_epoch_msWHERE status = 'DELAYED' - Partial index for efficiently querying delayed workflows that are ready to transition to ENQUEUED
Constraints:
- Unique constraint on
(queue_name, deduplication_id)for queue deduplication
operation_outputs#
The operation_outputs table stores the results of individual operations (steps) within workflows. This enables precise replay during workflow recovery.
Primary Key: (workflow_uuid, function_id)
Columns:
workflow_uuid(TEXT, FOREIGN KEY → workflow_status) - Parent workflowfunction_id(INTEGER) - Sequential step identifier within workflowfunction_name(TEXT) - Name of the operation functionoutput(TEXT) - Serialized operation outputerror(TEXT) - Serialized error if operation failedchild_workflow_id(TEXT) - UUID of spawned child workflow (if applicable)started_at_epoch_ms(BIGINT) - Operation start timecompleted_at_epoch_ms(BIGINT) - Operation completion timeserialization(TEXT) - Serialization format metadata
Indexes:
idx_operation_outputs_completed_at_function_nameon(completed_at_epoch_ms, function_name)- Optimizes queries that filter or sort by completion time and function name
Foreign Key: ON DELETE CASCADE - Automatically deletes operation outputs when parent workflow is deleted
notifications#
The notifications table implements an event notification queue for inter-workflow communication.
Primary Key: message_uuid (TEXT)
Columns:
message_uuid(TEXT) - Unique identifier for the message. Can be explicitly provided via theidempotency_keyparameter insend()/send_async()for idempotent message delivery, or auto-generated if not provided.destination_uuid(TEXT, FOREIGN KEY → workflow_status) - Target workflow UUIDtopic(TEXT) - Notification topic for filteringmessage(TEXT) - Message contentcreated_at_epoch_ms(BIGINT) - Message creation timestampserialization(TEXT) - Serialization format metadataconsumed(BOOLEAN, NOT NULL, DEFAULT FALSE) - Tracks whether the message has been consumed byrecv()
Indexes:
idx_workflow_topicon(destination_uuid, topic)- For efficient topic-based queriesidx_notificationson(destination_uuid, topic)- Full index supporting observability queries and message retrieval operations
Idempotency:
The notifications table supports idempotent message insertion using the primary key constraint on message_uuid with INSERT ... ON CONFLICT DO NOTHING semantics. When send() or send_async() is called with an idempotency_key parameter, it maps directly to message_uuid to ensure exactly-once delivery. Duplicate message_uuid values are silently ignored during insertion.
Message Consumption:
The recv() method marks messages as consumed by updating consumed = TRUE rather than deleting them. This preserves message records for auditing while ensuring each message is only consumed once. The method returns the oldest unconsumed message (ordered by created_at_epoch_ms) transactionally.
Note: Temporary send workflows (using TEMP_SEND_WF_NAME) are deprecated and kept only for backwards compatibility with existing workflow_status records.
workflow_events#
The workflow_events table provides a mutable key-value store for workflow state.
Primary Key: (workflow_uuid, key)
Columns:
workflow_uuid(TEXT, FOREIGN KEY → workflow_status)key(TEXT) - Event keyvalue(TEXT) - Event valueserialization(TEXT) - Serialization format metadata
workflow_events_history#
The workflow_events_history table maintains an immutable version of workflow events for backwards compatibility.
Primary Key: (workflow_uuid, key, function_id)
Columns:
workflow_uuid(TEXT, FOREIGN KEY → workflow_status)key(TEXT) - Event keyvalue(TEXT) - Event valuefunction_id(INTEGER) - Tracks which function set the eventserialization(TEXT) - Serialization format metadata
streams#
The streams table stores ordered streams of values for workflows.
Primary Key: (workflow_uuid, key, offset)
Columns:
workflow_uuid(TEXT, FOREIGN KEY → workflow_status)key(TEXT) - Stream identifiervalue(TEXT) - Stream valueoffset(INTEGER) - Position in streamfunction_id(INTEGER) - Function that added this valueserialization(TEXT) - Serialization format metadata
Workflow Communication Observability#
The SystemDB class provides methods to query and inspect workflow communications for debugging and monitoring purposes. These methods allow you to retrieve events, notifications, and streams associated with a specific workflow.
get_all_events()#
The get_all_events(workflow_id: str) method retrieves all events associated with a workflow from the workflow_events table. It returns a dictionary mapping event keys to their deserialized values.
Returns: Dict[str, Any] - Dictionary of event key-value pairs
Use Cases:
- Inspecting workflow state set via events
- Debugging event-based workflow logic
- Auditing workflow event history
get_all_notifications()#
The get_all_notifications(workflow_id: str) method retrieves all notifications sent to a workflow from the notifications table. It returns a list of notification records ordered by creation timestamp.
Returns: List[NotificationInfo] where each NotificationInfo contains:
topic(Optional[str]) - Notification topic, or None for default topicmessage(Any) - Deserialized message contentcreated_at_epoch_ms(int) - Message creation timestamp in millisecondsconsumed(bool) - Whether the message has been consumed byrecv()
Use Cases:
- Monitoring inter-workflow communication
- Debugging notification delivery and consumption
- Auditing message flow between workflows
get_all_stream_entries()#
The get_all_stream_entries(workflow_id: str) method retrieves all stream entries for a workflow from the streams table. It returns a dictionary mapping stream keys to lists of deserialized values ordered by offset.
Returns: Dict[str, List[Any]] - Dictionary mapping stream keys to ordered lists of values
Use Cases:
- Inspecting stream data produced by workflows
- Debugging stream-based workflow patterns
- Analyzing workflow data flow
Note: Stream closed sentinel values (__DBOS_STREAM_CLOSED__) are automatically filtered out from the results.
workflow_schedules#
The workflow_schedules table stores scheduled/recurring workflow configurations.
Primary Key: schedule_id (TEXT)
Columns:
schedule_id(TEXT) - Unique schedule identifierschedule_name(TEXT, UNIQUE) - Unique human-readable schedule nameworkflow_name(TEXT) - Fully-qualified workflow function name to invokeworkflow_class_name(TEXT, nullable) - Fully-qualified class name for static class method workflowsschedule(TEXT) - Cron expression specifying execution timing (supports 6 fields including seconds)status(TEXT) - Current status (default 'ACTIVE')context(TEXT) - Serialized context object passed to each workflow invocationlast_fired_at(TEXT, nullable) - Timestamp of when this schedule last executed successfully. Used for tracking execution history and enabling automatic backfill.automatic_backfill(BOOLEAN, NOT NULL, DEFAULT FALSE) - Whether to automatically backfill missed executions on startup. When true, the scheduler will execute any missed schedule instances when it starts up.cron_timezone(TEXT, nullable) - IANA timezone name (e.g., "America/New_York") for evaluating the cron expression. If NULL, uses UTC.queue_name(TEXT, nullable) - Optional name of the declared queue to enqueue scheduled workflows to. If NULL, scheduled workflows use the internal queue (_dbos_sys_internal).
application_versions#
The application_versions table tracks application versions and their timestamps, enabling version management and determining the latest application version.
Primary Key: version_id (TEXT)
Columns:
version_id(TEXT) - Unique identifier for the version (UUID)version_name(TEXT, NOT NULL, UNIQUE) - Human-readable version nameversion_timestamp(BIGINT, NOT NULL) - Timestamp marking when this version was set as latest (milliseconds since epoch). Updated byset_latest_application_version()to mark a version as the current latest.created_at(BIGINT, NOT NULL) - Immutable timestamp when the version was first registered (milliseconds since epoch). Set once when the version is created and never changed.
Purpose:
This table stores all registered application versions with their associated timestamps. The latest version is determined by the highest version_timestamp value. Versions are automatically registered in the database when an executor starts. Application version is computed by hashing workflow source code (MD5), or defaults to 'DEFAULT_VERSION' if source code inspection fails (with a warning). Users can manually set a custom version through the application_version field in DBOSConfig for consistent versioning in environments where source code inspection might not work reliably. Executors print a warning on startup if they are not running the latest version. Scheduled workflows are always enqueued to the latest application version.
The version can be managed through the DBOS API:
list_application_versions()— Returns all versions, newest firstget_latest_application_version()— Returns the latest versionset_latest_application_version(version_name)— Sets a version as latest by updating itsversion_timestampto now (does not changecreated_at)
Default Values:
Both version_timestamp and created_at are set to the current time when a version is first registered:
PostgreSQL: (EXTRACT(epoch FROM now()) * 1000.0)::bigint
SQLite: CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER)
queues#
The queues table stores database-backed queue configurations and metadata. Database-backed queues enable dynamic queue modification and observability, and are the recommended approach for queue management. In-memory queues (not stored in the database) remain supported for backward compatibility but are deprecated.
Primary Key: queue_id (TEXT)
Columns:
queue_id(TEXT) - Unique queue identifier (UUID)name(TEXT, NOT NULL, UNIQUE) - Queue name used to reference the queue in API callsconcurrency(INTEGER, nullable) - Global concurrency limit across all workersworker_concurrency(INTEGER, nullable) - Per-worker concurrency limitrate_limit_max(INTEGER, nullable) - Maximum number of workflows that can start within the rate limit periodrate_limit_period_sec(REAL/DOUBLE PRECISION, nullable) - Rate limit period in secondspriority_enabled(BOOLEAN, NOT NULL, DEFAULT FALSE) - Whether workflows in this queue are ordered by prioritypartition_queue(BOOLEAN, NOT NULL, DEFAULT FALSE) - Whether the queue is partitionedpolling_interval_sec(REAL/DOUBLE PRECISION, NOT NULL, DEFAULT 1.0) - Interval in seconds between queue polling operationscreated_at(BIGINT, NOT NULL) - Timestamp when the queue was first registered (milliseconds since epoch)updated_at(BIGINT, NOT NULL) - Timestamp of the last configuration update (milliseconds since epoch)
Purpose:
Introduced in migration 21, this table supports database-backed queues where configuration is persisted in the database rather than stored only in memory. This enables:
- Dynamic reconfiguration of queue parameters (concurrency, rate limits, etc.) at runtime without restarting workers
- Centralized queue management across distributed deployments
- Observability of queue configurations through the DBOS client or dashboard
- Queue configuration that survives application restarts
Database-backed queues are registered via DBOS.register_queue() and retrieved via DBOS.retrieve_queue(). Workers automatically reload queue configurations on each polling cycle, picking up changes without requiring a restart.
event_dispatch_kv#
The event_dispatch_kv table provides key-value storage for event dispatch. Note: This table only appears in PostgreSQL migrations and is not included in the SQLAlchemy schema definition.
Primary Key: (service_name, workflow_fn_name, key)
Columns:
service_name(TEXT)workflow_fn_name(TEXT)key(TEXT)value(TEXT)update_seq(NUMERIC(38,0))update_time(NUMERIC(38,15))
Schema Migration System#
Migration Tracking#
DBOS uses a custom migration system tracked through a dbos_migrations table that stores a single version number representing the highest applied migration.
PostgreSQL: CREATE TABLE "{schema}".dbos_migrations (version BIGINT NOT NULL PRIMARY KEY)
SQLite: CREATE TABLE dbos_migrations (version INTEGER NOT NULL PRIMARY KEY)
Migration Execution#
Migrations are automatically executed during DBOS initialization when DBOS.launch() is called. The run_dbos_migrations() function:
- Queries the current version from
dbos_migrations - Iterates through all available migrations, skipping already applied ones
- Executes each pending migration SQL script in order
- Updates the version number in
dbos_migrationsafter each successful migration
Each migration is executed in its own separate database transaction, so migrations are committed independently. This reduces contention when running migrations against an active database.
Concurrent Migration Safety#
For PostgreSQL, the migration system uses an early-exit optimization combined with a PostgreSQL advisory lock to handle concurrent migration execution efficiently and safely.
Early Exit Optimization:
Before acquiring the advisory lock, the system checks whether migrations actually need to be run via the should_migrate() function. This function verifies:
- Whether the schema exists
- Whether the
dbos_migrationstable exists - Whether the current migration version is up to date
If all checks pass (the schema is already at the latest version), the function returns immediately without acquiring the advisory lock at all. This optimization avoids unnecessary lock contention and improves startup performance for applications where the schema is already current.
Advisory Lock (Conditional):
The advisory lock is only acquired when migrations actually need to be performed. When should_migrate() returns true, the system attempts to acquire advisory lock ID 1234567890 using pg_try_advisory_lock(). The system polls for the lock once per second with a 30-second timeout. If the lock cannot be acquired within this timeout, the system logs a warning and proceeds to run migrations without the lock rather than blocking indefinitely. The lock is released in a finally block using pg_advisory_unlock() after migrations complete.
This mechanism prevents contention when multiple DBOS instances launch concurrently on a fresh database. Only one instance performs the migration at a time, while other instances wait for the lock. The timeout prevents startup from hanging if a previous process crashed while holding the lock and Postgres is slow to release it. This is handled automatically during system database initialization and requires no user configuration or intervention.
Fallback behavior: If the PostgreSQL implementation does not support advisory locks (some managed services may not), the system gracefully falls back to running migrations without the lock. The migration logic catches exceptions from the pg_try_advisory_lock() call and proceeds with migrations regardless, ensuring compatibility across different PostgreSQL environments.
PostgreSQL Migrations#
23 migrations are defined for PostgreSQL via get_dbos_migrations():
- Initial schema creation - Creates all core tables, indexes, and optional LISTEN/NOTIFY triggers
- Add
queue_partition_keycolumn - Add queue status index
- Add
forked_fromcolumn and index - Add operation timing columns
- Create
workflow_events_historyand addfunction_idto streams - Add
owner_xid - Add
parent_workflow_idand index - Create
workflow_schedulestable - Add primary key to notifications - Adds a primary key constraint to the
notificationstable onmessage_uuid. The migration runner checksinformation_schema.table_constraintsin Python code before executing the SQL to determine if a primary key already exists on the table, and skips the migration entirely if one is found. This approach ensures compatibility with databases like CockroachDB that don't support PostgreSQL'sDO $$ ... END $$syntax. The SQL itself is a simpleALTER TABLEstatement:ALTER TABLE "{schema}".notifications ADD PRIMARY KEY (message_uuid); - Add
serializationcolumns to all tables - Add
consumedcolumn andidx_notificationsindex to notifications table - Create
application_versionstable withversion_id,version_name,version_timestamp, andcreated_atcolumns - Create PostgreSQL stored functions for SQL-based workflow operations:
-
enqueue_workflow- Direct SQL-based workflow enqueueing with the following parameters:workflow_name(TEXT, required) - The name of the workflow to enqueuequeue_name(TEXT, required) - The queue name for the workflowpositional_args(JSON[], optional, default empty array) - Positional arguments for the workflownamed_args(JSON, optional, default empty object) - Named arguments for the workflowclass_name(TEXT, optional) - The class name if using class-based workflowsconfig_name(TEXT, optional) - Configuration name for the workflowworkflow_id(TEXT, optional) - Custom workflow ID (auto-generated if not provided)app_version(TEXT, optional) - Application version stringtimeout_ms(BIGINT, optional) - Timeout in millisecondsdeadline_epoch_ms(BIGINT, optional) - Absolute deadline timestampdeduplication_id(TEXT, optional) - ID for deduplication within a queuepriority(INTEGER, optional, default 0) - Priority for queue orderingqueue_partition_key(TEXT, optional) - Partition key for queue distribution
Returns:
workflow_id(TEXT) - The UUID of the enqueued workflowThe function inserts workflows with 'ENQUEUED' status, validates parameters, handles deduplication, and ensures workflow metadata consistency on conflicts.
-
send_message- Direct SQL-based message sending to workflows:destination_id(TEXT, required) - The workflow UUID to send the message tomessage(JSON, required) - The message payloadtopic(TEXT, optional, default 'null__topic') - Message topic for filteringmessage_id(TEXT, optional, auto-generated) - Unique message identifier
Returns: VOID
The function inserts notifications into the notifications table and handles deduplication via message_id.
-
- Add scheduler enhancement columns to
workflow_schedulestable:last_fired_at(TEXT),automatic_backfill(BOOLEAN), andcron_timezone(TEXT) - Add delayed workflow support:
delay_until_epoch_mscolumn toworkflow_statustable andidx_workflow_status_delayedpartial index for efficiently querying delayed workflows ready to transition to ENQUEUED - Add
queue_namecolumn toworkflow_schedulestable - Add
was_forked_fromcolumn toworkflow_statustable - Create
idx_operation_outputs_completed_at_function_nameindex onoperation_outputstable with columns(completed_at_epoch_ms, function_name) - Migration 20 sets the search_path for the enqueue_workflow and send_message functions to pg_catalog and pg_temp for security purposes. It also sets search_path for notifications_function and workflow_events_function when listen/notify is enabled.
- Create
queuestable with columns:queue_id(primary key, auto-generated UUID),name(unique),concurrency,worker_concurrency,rate_limit_max,rate_limit_period_sec,priority_enabled,partition_queue,polling_interval_sec,created_at,updated_at - Optimize
idx_workflow_status_forked_fromindex by making it a partial index (WHERE forked_from IS NOT NULL) - Optimize
idx_workflow_status_parent_workflow_idindex by making it a partial index (WHERE parent_workflow_id IS NOT NULL)
SQLite Migrations#
23 migrations are defined for SQLite (no migration 10, 14, or 20), parallel to PostgreSQL with SQLite-specific syntax adaptations. SQLite version does not include the event_dispatch_kv table. Migration 13 creates the application_versions table with the same columns as PostgreSQL. Migration 15 adds the same scheduler enhancement columns as PostgreSQL: last_fired_at (TEXT), automatic_backfill (BOOLEAN), and cron_timezone (TEXT). Migration 16 adds the same delayed workflow support as PostgreSQL: delay_until_epoch_ms column and idx_workflow_status_delayed index. Migration 17 adds the queue_name column to workflow_schedules table. Migration 18 adds the was_forked_from column to workflow_status table. Migration 19 creates the idx_operation_outputs_completed_at_function_name index on the operation_outputs table with columns (completed_at_epoch_ms, function_name). Migration 21 creates the queues table with the same structure as PostgreSQL, using SQLite-specific syntax for timestamps and UUID generation. Migrations 22 and 23 optimize the idx_workflow_status_forked_from and idx_workflow_status_parent_workflow_id indexes by making them partial indexes (WHERE forked_from IS NOT NULL and WHERE parent_workflow_id IS NOT NULL, respectively).
Note: SQLite does not have an equivalent for PostgreSQL migration 14 (stored functions enqueue_workflow and send_message) or migration 20 (setting search_path on database functions), as these are PostgreSQL-specific features.
Limitations#
DBOS does not support rollback or downgrade functionality. The migration system only supports forward migrations with no logic for reverting to previous schema versions.
Configuration#
System Database URL#
The system database is configured via the system_database_url parameter in DBOSConfig:
PostgreSQL: postgresql://user:password@host:port/dbname_dbos_sys
SQLite: sqlite:///myapp.sqlite
If not provided, defaults to sqlite:///{name}.sqlite.
If only an application database URL is provided, DBOS automatically constructs the system database URL by appending _dbos_sys to the PostgreSQL database name or using the same SQLite file.
Schema Name#
The dbos_system_schema parameter specifies the schema name for system tables (PostgreSQL only). Default: "dbos".
Connection Pooling#
sys_db_pool_size controls the system database connection pool size. Default: 20 connections.
{
"pool_size": 20,
"max_overflow": 0,
"pool_timeout": 30,
"pool_pre_ping": True,
"connect_args": {
"application_name": "dbos_transact",
"connect_timeout": 10
}
}
NullPool Support#
DBOS supports disabling connection pooling using SQLAlchemy's NullPool. When poolclass is set to NullPool in db_engine_kwargs, DBOS automatically removes the standard pool-related parameters since they are incompatible with NullPool:
pool_timeoutmax_overflowpool_sizepool_pre_ping
NullPool creates a new database connection for each request and closes it immediately after use, effectively disabling connection pooling. This is useful for scenarios where connection pooling is not desired, such as:
- Using external connection poolers (e.g., PgBouncer)
- Serverless environments
- Testing scenarios
The NullPool behavior applies to both the application database and system database configurations.
Example configuration (Python):
from sqlalchemy.pool import NullPool
config["db_engine_kwargs"] = {"poolclass": NullPool}
Example configuration (YAML):
database:
db_engine_kwargs:
poolclass: NullPool
Configuration File Example#
name: "my-application"
system_database_url: "postgresql://user:pass@localhost/myapp_dbos_sys"
dbos_system_schema: "dbos"
database:
sys_db_pool_size: 30
db_engine_kwargs:
pool_timeout: 60
Environment Variables#
For DBOS Cloud deployments, configuration is overridden by environment variables:
DBOS_SYSTEM_DATABASE_URL- System database connection stringDBOS_DATABASE_URL- Application database connection string
Database Backends#
PostgreSQL#
PostgreSQL is the recommended backend for production. The system:
- Creates the database if it doesn't exist
- Creates the DBOS schema
- Uses the
postgresql+psycopgdriver - Supports native LISTEN/NOTIFY for real-time notifications
SQLite#
SQLite is supported for development and testing. The system:
- Filters PostgreSQL-specific connection arguments
- Applies SQLite-specific schema with adapted syntax
- Does not include the
event_dispatch_kvtable
Workflow State Management#
Initialization#
When a workflow starts, the init_workflow method calls _insert_workflow_status to create or update the workflow record using PostgreSQL upsert semantics (INSERT ... ON CONFLICT DO UPDATE).
Outcome Recording#
When a workflow completes, update_workflow_outcome updates the status, output, or error fields in workflow_status.
Step Results#
Individual operation results are recorded by record_operation_result, which calls _record_operation_result_txn to insert into operation_outputs using INSERT ... ON CONFLICT DO UPDATE. When a conflict occurs on the composite primary key (workflow_uuid, function_id), the system updates the completed_at_epoch_ms timestamp to ensure the completion time is properly recorded even when the same operation result is inserted multiple times due to retries or concurrent workflow execution.
Queue Management#
Workflow queues are managed through the queue_name field in workflow_status. The start_queued_workflows method:
- Retrieves workflows with status ENQUEUED
- Orders by priority (ascending) and creation time (FIFO)
- Updates status to PENDING when dequeuing
- Enforces deduplication via the unique constraint on
(queue_name, deduplication_id)
Workflows can be enqueued with a delay by setting the delay_until_epoch_ms field. Delayed workflows initially have DELAYED status and do not execute until the delay period expires. The transition_delayed_workflows method periodically checks for DELAYED workflows whose delay_until_epoch_ms timestamp has passed and transitions them to ENQUEUED status, making them eligible for dequeuing and execution.
The queue_name and queue_partition_key fields can be set not only during initial workflow creation but also when resuming or forking workflows. This provides flexibility in workflow orchestration and queue management:
-
Workflow Resumption: When resuming workflows via
resume_workflow()orresume_workflows(), an optionalqueue_nameparameter can be provided to place the resumed workflow on a specific queue. If not specified, resumed workflows default to theINTERNAL_QUEUE_NAME. -
Workflow Forking: When forking workflows via
fork_workflow(), optionalqueue_nameandqueue_partition_keyparameters can be provided to place the forked workflow on a specific queue with a specific partition key. Ifqueue_nameis not specified, forked workflows default to theINTERNAL_QUEUE_NAME.
Recovery#
The recovery_attempts field tracks retry counts. When max recovery attempts are exceeded, the workflow status is set to MAX_RECOVERY_ATTEMPTS_EXCEEDED, effectively moving it to a "dead letter queue" state where it won't be automatically recovered.
Design Rationale#
Persistence-First Architecture#
Schedules and workflow state persist in the database and survive application restarts, distinguishing dynamic scheduling from decorator-based scheduling.
Exactly-Once Semantics#
The system implements exactly-once semantics through database upserts with conflict resolution on workflow_uuid.
Distributed Execution#
Changes to schedules are immediately reflected across all workers in distributed deployments, with DBOS periodically polling the database for updates.
Foreign Key Cascading#
ON DELETE CASCADE is used throughout to automatically clean up related records when workflows are deleted.
Separation of Concerns#
The system maintains clear separation between the system database (framework metadata) and application database (user data), each with its own connection URL, pool, and schema.
Relevant Code Files#
| File | Description | URL |
|---|---|---|
dbos/_schemas/system_database.py | SQLAlchemy table definitions for all system tables | View |
dbos/_migration.py | SQL DDL migration scripts for PostgreSQL and SQLite | View |
dbos/_sys_db.py | System database operations and workflow state management | View |
dbos/_sys_db_postgres.py | PostgreSQL-specific system database implementation | View |
dbos/_sys_db_sqlite.py | SQLite-specific system database implementation | View |
dbos/_dbos_config.py | Configuration management including database URLs and pooling | View |
dbos/_dbos.py | Main DBOS class with startup and migration execution | View |
dbos/_scheduler.py | Dynamic scheduler implementation using workflow_schedules | View |
dbos/cli/migration.py | Programmatic migration API | View |
tests/test_schema_migration.py | Schema migration test suite | View |
Related Topics#
- Application Database Schema - Separate schema for transaction outputs in user's application database
- Dynamic Workflow Scheduling - How the workflow_schedules table enables runtime scheduling
- Workflow Recovery - How operation_outputs enables workflow replay and recovery
- Workflow Queues - Queue implementation using workflow_status table