Documents
System Database Schema
System Database Schema
Type
Topic
Status
Published
Created
Feb 24, 2026
Updated
May 26, 2026
Created by
Dosu Bot
Updated by
Dosu Bot

System Database Schema#

Lead Section#

The System Database Schema in DBOS Transact Python is a comprehensive database design that provides persistent storage for workflow execution state, schedules, and metadata. This schema enables durable workflow execution with recovery capabilities, allowing workflows to survive application restarts and failures.

The system database consists of ten core tables stored in the "dbos" schema (PostgreSQL) or directly in the database (SQLite). These tables track workflow execution status, operation outputs, notifications, events, streams, scheduled workflows, application versions, and queue configurations. The schema is maintained through a versioned migration system that automatically applies incremental updates during DBOS initialization.

DBOS maintains a clear separation between the system database (framework metadata) and the application database (user data), each with its own connection URL and schema. The system database supports both PostgreSQL and SQLite backends, with PostgreSQL recommended for production use.

Core Tables#

workflow_status#

The workflow_status table is the central table for tracking all workflow executions. It stores comprehensive metadata about each workflow's lifecycle, execution state, and relationships.

Primary Key: workflow_uuid (TEXT)

Core Columns:

  • status (TEXT) - Current workflow status: PENDING, SUCCESS, ERROR, CANCELLED, ENQUEUED, DELAYED, MAX_RECOVERY_ATTEMPTS_EXCEEDED
  • name (TEXT) - Fully-qualified workflow function name
  • inputs (TEXT) - Serialized workflow input arguments
  • output (TEXT) - Serialized workflow return value
  • error (TEXT) - Serialized error information if workflow failed

Authentication & Identity:

  • authenticated_user (TEXT) - User who initiated the workflow
  • assumed_role (TEXT) - Role assumed for execution
  • authenticated_roles (TEXT) - Available roles for the user

Execution Tracking:

Application Context:

  • application_version (TEXT) - Version of the application. Automatically computed from workflow source code (MD5 hash), or defaults to 'DEFAULT_VERSION' if source code inspection fails. Can be manually set via the application_version field in DBOSConfig.
  • application_id (TEXT) - Application identifier
  • class_name (VARCHAR(255)) - Class name for static class method workflows
  • config_name (VARCHAR(255)) - Configuration name

Workflow Relationships:

Queue & Recovery:

  • queue_name (TEXT) - Queue name for queued workflows
  • deduplication_id (TEXT) - For queue deduplication
  • priority (INTEGER) - Execution priority (default 0)
  • queue_partition_key (TEXT) - Partition key for queue distribution
  • recovery_attempts (BIGINT) - Number of recovery attempts (default 0)
  • delay_until_epoch_ms (BIGINT, nullable) - Epoch timestamp in milliseconds until which a workflow should remain in DELAYED status. When the current time exceeds this value, the workflow transitions from DELAYED to ENQUEUED. NULL for workflows without a delay.
  • completed_at (BIGINT, nullable) - The UNIX epoch timestamp (in milliseconds) at which the workflow completed (SUCCESS, ERROR, or CANCELLED). NULL if the workflow has not completed.

Timeout Management:

  • workflow_timeout_ms (BIGINT) - Workflow timeout in milliseconds
  • workflow_deadline_epoch_ms (BIGINT) - Absolute deadline timestamp

Serialization:

Indexes:

Constraints:

operation_outputs#

The operation_outputs table stores the results of individual operations (steps) within workflows. This enables precise replay during workflow recovery.

Primary Key: (workflow_uuid, function_id)

Columns:

  • workflow_uuid (TEXT, FOREIGN KEY → workflow_status) - Parent workflow
  • function_id (INTEGER) - Sequential step identifier within workflow
  • function_name (TEXT) - Name of the operation function
  • output (TEXT) - Serialized operation output
  • error (TEXT) - Serialized error if operation failed
  • child_workflow_id (TEXT) - UUID of spawned child workflow (if applicable)
  • started_at_epoch_ms (BIGINT) - Operation start time
  • completed_at_epoch_ms (BIGINT) - Operation completion time
  • serialization (TEXT) - Serialization format metadata

Indexes:

Foreign Key: ON DELETE CASCADE - Automatically deletes operation outputs when parent workflow is deleted

notifications#

The notifications table implements an event notification queue for inter-workflow communication.

Primary Key: message_uuid (TEXT)

Columns:

  • message_uuid (TEXT) - Unique identifier for the message. Can be explicitly provided via the idempotency_key parameter in send()/send_async() for idempotent message delivery, or auto-generated if not provided.
  • destination_uuid (TEXT, FOREIGN KEY → workflow_status) - Target workflow UUID
  • topic (TEXT) - Notification topic for filtering
  • message (TEXT) - Message content
  • created_at_epoch_ms (BIGINT) - Message creation timestamp
  • serialization (TEXT) - Serialization format metadata
  • consumed (BOOLEAN, NOT NULL, DEFAULT FALSE) - Tracks whether the message has been consumed by recv()

Indexes:

  • idx_workflow_topic on (destination_uuid, topic) - For efficient topic-based queries
  • idx_notifications on (destination_uuid, topic) - Full index supporting observability queries and message retrieval operations

Idempotency:

The notifications table supports idempotent message insertion using the primary key constraint on message_uuid with INSERT ... ON CONFLICT DO NOTHING semantics. When send() or send_async() is called with an idempotency_key parameter, it maps directly to message_uuid to ensure exactly-once delivery. Duplicate message_uuid values are silently ignored during insertion.

Message Consumption:

The recv() method marks messages as consumed by updating consumed = TRUE rather than deleting them. This preserves message records for auditing while ensuring each message is only consumed once. The method returns the oldest unconsumed message (ordered by created_at_epoch_ms) transactionally.

Note: Temporary send workflows (using TEMP_SEND_WF_NAME) are deprecated and kept only for backwards compatibility with existing workflow_status records.

workflow_events#

The workflow_events table provides a mutable key-value store for workflow state.

Primary Key: (workflow_uuid, key)

Columns:

  • workflow_uuid (TEXT, FOREIGN KEY → workflow_status)
  • key (TEXT) - Event key
  • value (TEXT) - Event value
  • serialization (TEXT) - Serialization format metadata

workflow_events_history#

The workflow_events_history table maintains an immutable version of workflow events for backwards compatibility.

Primary Key: (workflow_uuid, key, function_id)

Columns:

  • workflow_uuid (TEXT, FOREIGN KEY → workflow_status)
  • key (TEXT) - Event key
  • value (TEXT) - Event value
  • function_id (INTEGER) - Tracks which function set the event
  • serialization (TEXT) - Serialization format metadata

streams#

The streams table stores ordered streams of values for workflows.

Primary Key: (workflow_uuid, key, offset)

Columns:

  • workflow_uuid (TEXT, FOREIGN KEY → workflow_status)
  • key (TEXT) - Stream identifier
  • value (TEXT) - Stream value
  • offset (INTEGER) - Position in stream
  • function_id (INTEGER) - Function that added this value
  • serialization (TEXT) - Serialization format metadata

Workflow Communication Observability#

The SystemDB class provides methods to query and inspect workflow communications for debugging and monitoring purposes. These methods allow you to retrieve events, notifications, and streams associated with a specific workflow.

get_all_events()#

The get_all_events(workflow_id: str) method retrieves all events associated with a workflow from the workflow_events table. It returns a dictionary mapping event keys to their deserialized values.

Returns: Dict[str, Any] - Dictionary of event key-value pairs

Use Cases:

  • Inspecting workflow state set via events
  • Debugging event-based workflow logic
  • Auditing workflow event history

get_all_notifications()#

The get_all_notifications(workflow_id: str) method retrieves all notifications sent to a workflow from the notifications table. It returns a list of notification records ordered by creation timestamp.

Returns: List[NotificationInfo] where each NotificationInfo contains:

  • topic (Optional[str]) - Notification topic, or None for default topic
  • message (Any) - Deserialized message content
  • created_at_epoch_ms (int) - Message creation timestamp in milliseconds
  • consumed (bool) - Whether the message has been consumed by recv()

Use Cases:

  • Monitoring inter-workflow communication
  • Debugging notification delivery and consumption
  • Auditing message flow between workflows

get_all_stream_entries()#

The get_all_stream_entries(workflow_id: str) method retrieves all stream entries for a workflow from the streams table. It returns a dictionary mapping stream keys to lists of deserialized values ordered by offset.

Returns: Dict[str, List[Any]] - Dictionary mapping stream keys to ordered lists of values

Use Cases:

  • Inspecting stream data produced by workflows
  • Debugging stream-based workflow patterns
  • Analyzing workflow data flow

Note: Stream closed sentinel values (__DBOS_STREAM_CLOSED__) are automatically filtered out from the results.

workflow_schedules#

The workflow_schedules table stores scheduled/recurring workflow configurations.

Primary Key: schedule_id (TEXT)

Columns:

application_versions#

The application_versions table tracks application versions and their timestamps, enabling version management and determining the latest application version.

Primary Key: version_id (TEXT)

Columns:

  • version_id (TEXT) - Unique identifier for the version (UUID)
  • version_name (TEXT, NOT NULL, UNIQUE) - Human-readable version name
  • version_timestamp (BIGINT, NOT NULL) - Timestamp marking when this version was set as latest (milliseconds since epoch). Updated by set_latest_application_version() to mark a version as the current latest.
  • created_at (BIGINT, NOT NULL) - Immutable timestamp when the version was first registered (milliseconds since epoch). Set once when the version is created and never changed.

Purpose:

This table stores all registered application versions with their associated timestamps. The latest version is determined by the highest version_timestamp value. Versions are automatically registered in the database when an executor starts. Application version is computed by hashing workflow source code (MD5), or defaults to 'DEFAULT_VERSION' if source code inspection fails (with a warning). Users can manually set a custom version through the application_version field in DBOSConfig for consistent versioning in environments where source code inspection might not work reliably. Executors print a warning on startup if they are not running the latest version. Scheduled workflows are always enqueued to the latest application version.

The version can be managed through the DBOS API:

  • list_application_versions() — Returns all versions, newest first
  • get_latest_application_version() — Returns the latest version
  • set_latest_application_version(version_name) — Sets a version as latest by updating its version_timestamp to now (does not change created_at)

Default Values:

Both version_timestamp and created_at are set to the current time when a version is first registered:

PostgreSQL: (EXTRACT(epoch FROM now()) * 1000.0)::bigint

SQLite: CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER)

queues#

The queues table stores database-backed queue configurations and metadata. Database-backed queues enable dynamic queue modification and observability, and are the recommended approach for queue management. In-memory queues (not stored in the database) remain supported for backward compatibility but are deprecated.

Primary Key: queue_id (TEXT)

Columns:

  • queue_id (TEXT) - Unique queue identifier (UUID)
  • name (TEXT, NOT NULL, UNIQUE) - Queue name used to reference the queue in API calls
  • concurrency (INTEGER, nullable) - Global concurrency limit across all workers
  • worker_concurrency (INTEGER, nullable) - Per-worker concurrency limit
  • rate_limit_max (INTEGER, nullable) - Maximum number of workflows that can start within the rate limit period
  • rate_limit_period_sec (REAL/DOUBLE PRECISION, nullable) - Rate limit period in seconds
  • priority_enabled (BOOLEAN, NOT NULL, DEFAULT FALSE) - Whether workflows in this queue are ordered by priority
  • partition_queue (BOOLEAN, NOT NULL, DEFAULT FALSE) - Whether the queue is partitioned
  • polling_interval_sec (REAL/DOUBLE PRECISION, NOT NULL, DEFAULT 1.0) - Interval in seconds between queue polling operations
  • created_at (BIGINT, NOT NULL) - Timestamp when the queue was first registered (milliseconds since epoch)
  • updated_at (BIGINT, NOT NULL) - Timestamp of the last configuration update (milliseconds since epoch)

Purpose:

Introduced in migration 21, this table supports database-backed queues where configuration is persisted in the database rather than stored only in memory. This enables:

  • Dynamic reconfiguration of queue parameters (concurrency, rate limits, etc.) at runtime without restarting workers
  • Centralized queue management across distributed deployments
  • Observability of queue configurations through the DBOS client or dashboard
  • Queue configuration that survives application restarts

Database-backed queues are registered via DBOS.register_queue() and retrieved via DBOS.retrieve_queue(). Workers automatically reload queue configurations on each polling cycle, picking up changes without requiring a restart.

event_dispatch_kv#

The event_dispatch_kv table provides key-value storage for event dispatch. Note: This table only appears in PostgreSQL migrations and is not included in the SQLAlchemy schema definition.

Primary Key: (service_name, workflow_fn_name, key)

Columns:

  • service_name (TEXT)
  • workflow_fn_name (TEXT)
  • key (TEXT)
  • value (TEXT)
  • update_seq (NUMERIC(38,0))
  • update_time (NUMERIC(38,15))

Schema Migration System#

Migration Tracking#

DBOS uses a custom migration system tracked through a dbos_migrations table that stores a single version number representing the highest applied migration.

PostgreSQL: CREATE TABLE "{schema}".dbos_migrations (version BIGINT NOT NULL PRIMARY KEY)

SQLite: CREATE TABLE dbos_migrations (version INTEGER NOT NULL PRIMARY KEY)

Migration Execution#

Migrations are automatically executed during DBOS initialization when DBOS.launch() is called. The run_dbos_migrations() function:

  1. Queries the current version from dbos_migrations
  2. Iterates through all available migrations, skipping already applied ones
  3. Executes each pending migration SQL script in order
  4. Updates the version number in dbos_migrations after each successful migration

Each migration is executed in its own separate database transaction, so migrations are committed independently. This reduces contention when running migrations against an active database.

Concurrent Migration Safety#

For PostgreSQL, the migration system uses an early-exit optimization combined with a PostgreSQL advisory lock to handle concurrent migration execution efficiently and safely.

Early Exit Optimization:

Before acquiring the advisory lock, the system checks whether migrations actually need to be run via the should_migrate() function. This function verifies:

  1. Whether the schema exists
  2. Whether the dbos_migrations table exists
  3. Whether the current migration version is up to date

If all checks pass (the schema is already at the latest version), the function returns immediately without acquiring the advisory lock at all. This optimization avoids unnecessary lock contention and improves startup performance for applications where the schema is already current.

Advisory Lock (Conditional):

The advisory lock is only acquired when migrations actually need to be performed. When should_migrate() returns true, the system attempts to acquire advisory lock ID 1234567890 using pg_try_advisory_lock(). The system polls for the lock once per second with a 30-second timeout. If the lock cannot be acquired within this timeout, the system logs a warning and proceeds to run migrations without the lock rather than blocking indefinitely. The lock is released in a finally block using pg_advisory_unlock() after migrations complete.

This mechanism prevents contention when multiple DBOS instances launch concurrently on a fresh database. Only one instance performs the migration at a time, while other instances wait for the lock. The timeout prevents startup from hanging if a previous process crashed while holding the lock and Postgres is slow to release it. This is handled automatically during system database initialization and requires no user configuration or intervention.

Fallback behavior: If the PostgreSQL implementation does not support advisory locks (some managed services may not), the system gracefully falls back to running migrations without the lock. The migration logic catches exceptions from the pg_try_advisory_lock() call and proceeds with migrations regardless, ensuring compatibility across different PostgreSQL environments.

PostgreSQL Migrations#

39 migrations are defined for PostgreSQL via get_dbos_migrations():

  1. Initial schema creation - Creates all core tables, indexes, and optional LISTEN/NOTIFY triggers

  2. Add queue_partition_key column

  3. Add queue status index

  4. Add forked_from column and index

  5. Add operation timing columns

  6. Create workflow_events_history and add function_id to streams

  7. Add owner_xid

  8. Add parent_workflow_id and index

  9. Create workflow_schedules table

  10. Add primary key to notifications - Adds a primary key constraint to the notifications table on message_uuid. The migration runner checks information_schema.table_constraints in Python code before executing the SQL to determine if a primary key already exists on the table, and skips the migration entirely if one is found. This approach ensures compatibility with databases like CockroachDB that don't support PostgreSQL's DO $$ ... END $$ syntax. The SQL itself is a simple ALTER TABLE statement:

    ALTER TABLE "{schema}".notifications ADD PRIMARY KEY (message_uuid);
    
  11. Add serialization columns to all tables

  12. Add consumed column and idx_notifications index to notifications table

  13. Create application_versions table with version_id, version_name, version_timestamp, and created_at columns

  14. Create PostgreSQL stored functions for SQL-based workflow operations:

    • enqueue_workflow - Direct SQL-based workflow enqueueing with the following parameters:

      • workflow_name (TEXT, required) - The name of the workflow to enqueue
      • queue_name (TEXT, required) - The queue name for the workflow
      • positional_args (JSON[], optional, default empty array) - Positional arguments for the workflow
      • named_args (JSON, optional, default empty object) - Named arguments for the workflow
      • class_name (TEXT, optional) - The class name if using class-based workflows
      • config_name (TEXT, optional) - Configuration name for the workflow
      • workflow_id (TEXT, optional) - Custom workflow ID (auto-generated if not provided)
      • app_version (TEXT, optional) - Application version string
      • timeout_ms (BIGINT, optional) - Timeout in milliseconds
      • deadline_epoch_ms (BIGINT, optional) - Absolute deadline timestamp
      • deduplication_id (TEXT, optional) - ID for deduplication within a queue
      • priority (INTEGER, optional, default 0) - Priority for queue ordering
      • queue_partition_key (TEXT, optional) - Partition key for queue distribution

      Returns: workflow_id (TEXT) - The UUID of the enqueued workflow

      The function inserts workflows with 'ENQUEUED' status, validates parameters, handles deduplication, and ensures workflow metadata consistency on conflicts.

    • send_message - Direct SQL-based message sending to workflows:

      • destination_id (TEXT, required) - The workflow UUID to send the message to
      • message (JSON, required) - The message payload
      • topic (TEXT, optional, default 'null__topic') - Message topic for filtering
      • message_id (TEXT, optional, auto-generated) - Unique message identifier

      Returns: VOID

      The function inserts notifications into the notifications table and handles deduplication via message_id.

  15. Add scheduler enhancement columns to workflow_schedules table: last_fired_at (TEXT), automatic_backfill (BOOLEAN), and cron_timezone (TEXT)

  16. Add delayed workflow support: delay_until_epoch_ms column to workflow_status table and idx_workflow_status_delayed partial index for efficiently querying delayed workflows ready to transition to ENQUEUED

  17. Add queue_name column to workflow_schedules table

  18. Add was_forked_from column to workflow_status table

  19. Create idx_operation_outputs_completed_at_function_name index on operation_outputs table with columns (completed_at_epoch_ms, function_name)

  20. Migration 20 sets the search_path for the enqueue_workflow and send_message functions to pg_catalog and pg_temp for security purposes. It also sets search_path for notifications_function and workflow_events_function when listen/notify is enabled.

  21. Create queues table with columns: queue_id (primary key, auto-generated UUID), name (unique), concurrency, worker_concurrency, rate_limit_max, rate_limit_period_sec, priority_enabled, partition_queue, polling_interval_sec, created_at, updated_at

  22. Optimize idx_workflow_status_forked_from index by making it a partial index (WHERE forked_from IS NOT NULL)

  23. Optimize idx_workflow_status_parent_workflow_id index by making it a partial index (WHERE parent_workflow_id IS NOT NULL)

  24. Add completed_at column to workflow_status table and create idx_workflow_status_completed_at partial index for efficient querying of workflows by completion time

  25. through 37. Additional migrations for various schema enhancements

  26. Recreate enqueue_workflow stored function with expanded signature including three new optional parameters:

    • authenticated_user (TEXT, optional) - The authenticated user for the workflow
    • authenticated_roles (TEXT, optional) - The authenticated roles for the workflow (JSON-encoded)
    • delay_until_epoch_ms (BIGINT, optional) - If provided, workflow starts with DELAYED status instead of ENQUEUED; validated to be >= 0

    When delay_until_epoch_ms is provided, the workflow is inserted with DELAYED status instead of ENQUEUED. The function validates that delay_until_epoch_ms must be >= 0 if provided. The function sets search_path to pg_catalog and pg_temp for security.

  27. Create streams LISTEN/NOTIFY infrastructure (gated on use_listen_notify):

    • Create streams_function() that triggers pg_notify('dbos_streams_channel', payload) where payload is workflow_uuid || '::' || key
    • Set search_path for streams_function() to pg_catalog, pg_temp
    • Create dbos_streams_trigger on the streams table that fires AFTER INSERT FOR EACH ROW and executes streams_function()

    This migration follows the same pattern as the notifications and workflow_events LISTEN/NOTIFY infrastructure (migrations 1 and 20), enabling low-latency delivery of stream data to waiting readers. Deployments without LISTEN/NOTIFY (e.g., CockroachDB) set use_listen_notify=False and use the polling fallback.

SQLite Migrations#

24 migrations are defined for SQLite (no migration 10, 14, or 20), parallel to PostgreSQL with SQLite-specific syntax adaptations. SQLite version does not include the event_dispatch_kv table. Migration 13 creates the application_versions table with the same columns as PostgreSQL. Migration 15 adds the same scheduler enhancement columns as PostgreSQL: last_fired_at (TEXT), automatic_backfill (BOOLEAN), and cron_timezone (TEXT). Migration 16 adds the same delayed workflow support as PostgreSQL: delay_until_epoch_ms column and idx_workflow_status_delayed index. Migration 17 adds the queue_name column to workflow_schedules table. Migration 18 adds the was_forked_from column to workflow_status table. Migration 19 creates the idx_operation_outputs_completed_at_function_name index on the operation_outputs table with columns (completed_at_epoch_ms, function_name). Migration 21 creates the queues table with the same structure as PostgreSQL, using SQLite-specific syntax for timestamps and UUID generation. Migrations 22 and 23 optimize the idx_workflow_status_forked_from and idx_workflow_status_parent_workflow_id indexes by making them partial indexes (WHERE forked_from IS NOT NULL and WHERE parent_workflow_id IS NOT NULL, respectively). Migration 24 adds the completed_at column to workflow_status table and creates idx_workflow_status_completed_at partial index for efficient querying of workflows by completion time.

Note: SQLite does not have an equivalent for PostgreSQL migration 14 (stored functions enqueue_workflow and send_message) or migration 20 (setting search_path on database functions), as these are PostgreSQL-specific features.

Limitations#

DBOS does not support rollback or downgrade functionality. The migration system only supports forward migrations with no logic for reverting to previous schema versions.

Configuration#

System Database URL#

The system database is configured via the system_database_url parameter in DBOSConfig:

PostgreSQL: postgresql://user:password@host:port/dbname_dbos_sys

SQLite: sqlite:///myapp.sqlite

If not provided, defaults to sqlite:///{name}.sqlite.

If only an application database URL is provided, DBOS automatically constructs the system database URL by appending _dbos_sys to the PostgreSQL database name or using the same SQLite file.

Schema Name#

The dbos_system_schema parameter specifies the schema name for system tables (PostgreSQL only). Default: "dbos".

Connection Pooling#

sys_db_pool_size controls the system database connection pool size. Default: 20 connections.

Default engine configuration:

{
    "pool_size": 20,
    "max_overflow": 0,
    "pool_timeout": 30,
    "pool_pre_ping": True,
    "connect_args": {
        "application_name": "dbos_transact",
        "connect_timeout": 10
    }
}

NullPool Support#

DBOS supports disabling connection pooling using SQLAlchemy's NullPool. When poolclass is set to NullPool in db_engine_kwargs, DBOS automatically removes the standard pool-related parameters since they are incompatible with NullPool:

  • pool_timeout
  • max_overflow
  • pool_size
  • pool_pre_ping

NullPool creates a new database connection for each request and closes it immediately after use, effectively disabling connection pooling. This is useful for scenarios where connection pooling is not desired, such as:

  • Using external connection poolers (e.g., PgBouncer)
  • Serverless environments
  • Testing scenarios

The NullPool behavior applies to both the application database and system database configurations.

Example configuration (Python):

from sqlalchemy.pool import NullPool

config["db_engine_kwargs"] = {"poolclass": NullPool}

Example configuration (YAML):

database:
  db_engine_kwargs:
    poolclass: NullPool

Configuration File Example#

name: "my-application"
system_database_url: "postgresql://user:pass@localhost/myapp_dbos_sys"
dbos_system_schema: "dbos"

database:
  sys_db_pool_size: 30
  db_engine_kwargs:
    pool_timeout: 60

Environment Variables#

For DBOS Cloud deployments, configuration is overridden by environment variables:

  • DBOS_SYSTEM_DATABASE_URL - System database connection string
  • DBOS_DATABASE_URL - Application database connection string

Database Backends#

PostgreSQL#

PostgreSQL is the recommended backend for production. The system:

SQLite#

SQLite is supported for development and testing. The system:

Workflow State Management#

Initialization#

When a workflow starts, the init_workflow method calls _insert_workflow_status to create or update the workflow record using PostgreSQL upsert semantics (INSERT ... ON CONFLICT DO UPDATE).

Outcome Recording#

When a workflow completes, update_workflow_outcome updates the status, output, or error fields in workflow_status, and sets the completed_at timestamp to the current time (in milliseconds since epoch) for workflows that have reached a terminal state (SUCCESS, ERROR, or CANCELLED).

Step Results#

Individual operation results are recorded by record_operation_result, which calls _record_operation_result_txn to insert into operation_outputs using INSERT ... ON CONFLICT DO UPDATE. When a conflict occurs on the composite primary key (workflow_uuid, function_id), the system updates the completed_at_epoch_ms timestamp to ensure the completion time is properly recorded even when the same operation result is inserted multiple times due to retries or concurrent workflow execution.

Queue Management#

Workflow queues are managed through the queue_name field in workflow_status. The start_queued_workflows method:

  1. Retrieves workflows with status ENQUEUED
  2. Orders by priority (ascending) and creation time (FIFO)
  3. Updates status to PENDING when dequeuing
  4. Enforces deduplication via the unique constraint on (queue_name, deduplication_id)

Workflows can be enqueued with a delay by setting the delay_until_epoch_ms field. Delayed workflows initially have DELAYED status and do not execute until the delay period expires. The transition_delayed_workflows method periodically checks for DELAYED workflows whose delay_until_epoch_ms timestamp has passed and transitions them to ENQUEUED status, making them eligible for dequeuing and execution.

The queue_name and queue_partition_key fields can be set not only during initial workflow creation but also when resuming or forking workflows. This provides flexibility in workflow orchestration and queue management:

  • Workflow Resumption: When resuming workflows via resume_workflow() or resume_workflows(), an optional queue_name parameter can be provided to place the resumed workflow on a specific queue. If not specified, resumed workflows default to the INTERNAL_QUEUE_NAME.

  • Workflow Forking: When forking workflows via fork_workflow(), optional queue_name and queue_partition_key parameters can be provided to place the forked workflow on a specific queue with a specific partition key. If queue_name is not specified, forked workflows default to the INTERNAL_QUEUE_NAME.

Recovery#

The recovery_attempts field tracks retry counts. When max recovery attempts are exceeded, the workflow status is set to MAX_RECOVERY_ATTEMPTS_EXCEEDED, effectively moving it to a "dead letter queue" state where it won't be automatically recovered.

Design Rationale#

Persistence-First Architecture#

Schedules and workflow state persist in the database and survive application restarts, distinguishing dynamic scheduling from decorator-based scheduling.

Exactly-Once Semantics#

The system implements exactly-once semantics through database upserts with conflict resolution on workflow_uuid.

Distributed Execution#

Changes to schedules are immediately reflected across all workers in distributed deployments, with DBOS periodically polling the database for updates.

Foreign Key Cascading#

ON DELETE CASCADE is used throughout to automatically clean up related records when workflows are deleted.

Separation of Concerns#

The system maintains clear separation between the system database (framework metadata) and application database (user data), each with its own connection URL, pool, and schema.

Relevant Code Files#

FileDescriptionURL
dbos/_schemas/system_database.pySQLAlchemy table definitions for all system tablesView
dbos/_migration.pySQL DDL migration scripts for PostgreSQL and SQLiteView
dbos/_sys_db.pySystem database operations and workflow state managementView
dbos/_sys_db_postgres.pyPostgreSQL-specific system database implementationView
dbos/_sys_db_sqlite.pySQLite-specific system database implementationView
dbos/_dbos_config.pyConfiguration management including database URLs and poolingView
dbos/_dbos.pyMain DBOS class with startup and migration executionView
dbos/_scheduler.pyDynamic scheduler implementation using workflow_schedulesView
dbos/cli/migration.pyProgrammatic migration APIView
tests/test_schema_migration.pySchema migration test suiteView
System Database Schema | Dosu