Documents
Class-Based Workflows
Class-Based Workflows
Type
Topic
Status
Published
Created
Feb 24, 2026
Updated
Mar 23, 2026
Created by
Dosu Bot
Updated by
Dosu Bot

Class-Based Workflows#

Overview#

Class-Based Workflows in DBOS Python enable developers to define workflows as static class methods in addition to traditional module-level function workflows. This architectural pattern allows for better code organization by grouping related workflow logic within classes while maintaining full integration with the DBOS scheduling and execution infrastructure. The feature was introduced in PR #589 on February 18, 2026, building upon the dynamic workflow scheduling infrastructure established in PR #581.

The class-based workflow system supports both static and class methods as workflow definitions, though only static class methods can be scheduled. Instance methods of configured classes, while usable for workflow execution, cannot be registered with the scheduler due to the requirement that the scheduler must be able to invoke workflows without access to object instances. This design decision maintains consistency with the stateless nature of scheduled job execution.

Class-based workflows integrate seamlessly with both decorator-based scheduling (@DBOS.scheduled()) and dynamic scheduling APIs (DBOS.create_schedule()). The system automatically tracks class metadata through the workflow_class_name field in the workflow schedules database table, enabling proper workflow resolution and invocation across application restarts and distributed deployments.

Workflow Definition Patterns#

DBOS Python supports two primary patterns for defining workflows:

Function-Based Workflows#

Traditional module-level functions decorated with @DBOS.workflow() provide a straightforward approach for simple workflow definitions. These workflows are standalone functions that do not require class context.

Class-Based Workflows#

Workflows defined as class methods offer improved code organization and modularity. All classes containing DBOS workflows must be decorated with @DBOS.dbos_class(), which ensures proper registration with the DBOS runtime and provides class-level configuration metadata. Class-based workflows can be implemented as:

  • Static methods: Self-contained workflows that do not require class or instance state
  • Class methods: Workflows that may need access to class-level attributes
  • Instance methods: Workflows that require instance-specific state and configuration

Instance methods with DBOS decorators require the class to inherit from DBOSConfiguredInstance, which provides a framework for managing configured instances with unique identifiers used during workflow recovery.

The @DBOS.dbos_class() Decorator#

The @DBOS.dbos_class() decorator serves as the registration mechanism for all classes containing DBOS workflow, transaction, or step functions. This decorator ensures that functions are properly registered with class-level configuration information and enables the DBOS runtime to correctly resolve and invoke class-based workflows.

The decorator accepts an optional class_name parameter that defaults to cls.__qualname__, allowing developers to customize the registered name used for workflow identification in the database and logs.

Example usage:

from dbos import DBOS
from datetime import datetime
from typing import Any

@DBOS.dbos_class()
class PaymentWorkflows:
    @staticmethod
    @DBOS.workflow()
    def process_payment(scheduled_at: datetime, ctx: Any) -> str:
        # Payment processing logic
        return "completed"

DBOSConfiguredInstance for Instance Methods#

Instance methods with workflow decorators require inheriting from DBOSConfiguredInstance, a base class that provides the infrastructure for managing configured workflow instances. Key requirements include:

This architecture enables DBOS to reconstruct the correct instance context when recovering workflows from the database after application restarts.

Example:

from dbos import DBOS, DBOSConfiguredInstance

@DBOS.dbos_class()
class URLFetcher(DBOSConfiguredInstance):
    def __init__(self, url: str):
        self.url = url
        super().__init__(config_name=url)

    @DBOS.workflow()
    def fetch_workflow(self) -> str:
        return self.fetch_url()

    @DBOS.step()
    def fetch_url(self) -> str:
        # Fetching logic
        return f"Content from {self.url}"

Function Type Classification#

DBOS uses the DBOSFuncType enumeration to distinguish between different method types during class registration and workflow execution. The classification is performed in the get_or_create_class_info() function in dbos/_registrations.py:

This classification determines how workflows are invoked during execution. For static methods, the class object is not prepended to the arguments, while instance and class methods have the appropriate class context resolved from the registry.

Database Storage and Metadata#

Scheduled workflows are persisted in the workflow_schedules table, which includes a dedicated workflow_class_name field to track class-based workflows. The schema includes:

  • schedule_id: Unique identifier for the schedule
  • schedule_name: Human-readable unique name
  • workflow_name: Fully-qualified workflow function name
  • workflow_class_name: Fully-qualified class name for static methods, or None for module-level functions
  • schedule: Cron expression supporting six fields (including seconds)
  • status: Current schedule status (ACTIVE, PAUSED, etc.)
  • context: JSON-serializable context object passed to each invocation
  • last_fired_at: ISO timestamp of the last execution, used for automatic backfill and monitoring
  • automatic_backfill: Boolean flag enabling automatic backfilling of missed executions on scheduler startup
  • cron_timezone: IANA timezone name (e.g., "America/New_York") for evaluating the cron expression, or None for UTC
  • queue_name: Target queue for scheduled workflow executions, or None for the internal queue

The workflow_class_name column is nullable for backward compatibility with existing module-level workflow schedules. Migration scripts were added in _migration.py for both PostgreSQL and SQLite to enable smooth upgrades.

Scheduled Workflow Function Signature#

Scheduled workflows, whether class-based or function-based, must follow a specific signature pattern. [Dynamic scheduling requires workflows to accept (scheduled_time: datetime, context: Any) parameters](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Critical Breaking Change: Function Signature), where [context can be any JSON-serializable value](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Comparison Table).

Example signature:

@staticmethod
@DBOS.workflow()
def scheduled_workflow(scheduled_time: datetime, context: Any) -> None:
    """Process scheduled task with provided context."""
    pass

Scheduler Integration#

Static vs. Dynamic Scheduling#

DBOS supports two scheduling approaches:

  1. Decorator-Based Scheduling: The @DBOS.scheduled() decorator defines schedules in code
  2. Dynamic Scheduling: The DBOS.create_schedule() API creates schedules at runtime

[Dynamic scheduling provides explicit support for class-based workflows through the workflow_class_name parameter](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Class Method Support) in schedule APIs. The workflow_class_name is automatically detected using get_func_info() during schedule creation and extracted from fi.class_info.registered_name if present.

Creating Class-Based Scheduled Workflows#

from dbos import DBOS
from datetime import datetime
from typing import Any

@DBOS.dbos_class()
class DataSyncWorkflows:
    @staticmethod
    @DBOS.workflow()
    def sync_data(scheduled_time: datetime, context: Any) -> None:
        env = context.get("env", "production")
        # Sync logic here
        pass

# Create schedule with timezone and automatic backfill
DBOS.create_schedule(
    schedule_name="data-sync-schedule",
    workflow_fn=DataSyncWorkflows.sync_data,
    schedule="0 9 * * 1-5", # 9 AM weekdays
    context={"env": "production", "priority": "high"},
    cron_timezone="America/New_York",
    automatic_backfill=True,
)

The create_schedule() method accepts several optional parameters:

  • automatic_backfill: bool = False: When True, automatically backfills missed executions when the scheduler starts up. The scheduler uses the last_fired_at field to determine which executions were missed during downtime.
  • cron_timezone: Optional[str] = None: IANA timezone name (e.g., "America/New_York", "Europe/London", "Asia/Tokyo") for evaluating the cron expression. If None, uses UTC. This is useful for schedules tied to business hours in specific regions.
  • queue_name: Optional[str] = None: Optional name of a declared queue to enqueue scheduled workflows to. If None, uses the internal queue. Defaults to None. This allows managing concurrency of scheduled workflows.

Validation and Restrictions#

DBOS.create_schedule() and DBOS.apply_schedules() include validation logic that raises a DBOSException if instance methods are passed:

if fi and fi.func_type == DBOSFuncType.Instance:
    raise DBOSException(
        "Configured instance methods cannot be used as scheduled workflows"
    )

Test cases verify this behavior, ensuring the system fails fast with a clear error message when instance methods are incorrectly used for scheduling.

Lifecycle Management#

Dynamic schedules persist in the database and survive application restarts. [Unlike decorator-based schedules which only exist in code, removing a create_schedule() call does not stop a dynamic schedule](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Step 3: Handle Schedule Persistence). Schedules must be explicitly deleted using DBOS.delete_schedule() or removed via DBOS.apply_schedules().

[The dynamic scheduling API provides extensive lifecycle management capabilities](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Step 4: Use New Management APIs):

# List all schedules
schedules = DBOS.list_schedules()

# Retrieve specific schedule
sched = DBOS.get_schedule("data-sync-schedule")

# Pause execution (retains configuration)
DBOS.pause_schedule("data-sync-schedule")

# Resume a paused schedule
DBOS.resume_schedule("data-sync-schedule")

# Trigger immediate execution
DBOS.trigger_schedule("data-sync-schedule")

# Delete permanently
DBOS.delete_schedule("data-sync-schedule")

# Backfill missed executions
DBOS.backfill_schedule(
    schedule_name="data-sync-schedule",
    start_time=datetime(2026, 1, 1),
    end_time=datetime(2026, 1, 31),
)

# Atomic update of multiple schedules
DBOS.apply_schedules([
    ScheduleInput(
        schedule_name="schedule-1",
        workflow_fn=DataWorkflows.method_a,
        schedule="0 * * * *",
        context={"id": 1},
        automatic_backfill=True,
        cron_timezone="America/Los_Angeles",
        queue_name="high-priority-queue",
    ),
])

The scheduler automatically tracks when each schedule last executed via the last_fired_at field, which can be queried through DBOS.get_schedule() for monitoring purposes. This field is also used internally to enable automatic backfill functionality when automatic_backfill=True.

Polling and Distribution#

DBOS periodically polls the database for new or updated schedules, with each schedule managed by a dedicated thread. Changes to schedules are immediately reflected across all workers in distributed deployments, ensuring consistent behavior.

Implementation Details#

Class Registration and Detection#

Static class methods are detected during class registration by the get_or_create_class_info() function in dbos/_registrations.py. The function:

  1. Iterates through class attributes using isinstance(attribute, staticmethod) checks
  2. Sets the appropriate DBOSFuncType based on method type
  3. Associates class information with the function via the dbos_func_decorator_info attribute

Workflow Invocation#

The execute_workflow_by_id() function in _core.py handles class method invocation. The invocation logic:

  1. Retrieves workflow metadata from the database including class_name
  2. Resolves the class object from _registry.class_info_map for instance and class methods
  3. For static methods (DBOSFuncType.Static), does not prepend the class object to arguments
  4. For instance and class methods, prepends the class context appropriately

This design ensures that static methods maintain their standard Python semantics while enabling proper instance recovery for configured instance methods.

Scheduler Integration Details#

The _enqueue_scheduled_workflow function in _scheduler.py accepts a class_name parameter, which is passed to workflow status initialization. During schedule creation, validation raises a DBOSException for instance methods to prevent unsupported configurations from being persisted.

Reliability Features#

[PR #484 (October 3, 2025) added critical reliability features](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Reliability Improvements):

Async Support#

[PR #493 (October 20, 2025) enabled async scheduled workflows](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Async Support) for both decorator and dynamic scheduling patterns, allowing better handling of I/O-bound tasks in scheduled class-based workflows.

Usage Examples#

Basic Static Class Method Workflow#

Example from test_scheduler.py:

from dbos import DBOS
from datetime import datetime
from typing import Any

@DBOS.dbos_class()
class MyScheduledClass:
    @staticmethod
    @DBOS.workflow()
    def scheduled_wf(scheduled_at: datetime, ctx: Any) -> None:
        print(f"Executing at {scheduled_at} with context: {ctx}")

# Create the schedule
DBOS.create_schedule(
    schedule_name="static-class-schedule",
    workflow_fn=MyScheduledClass.scheduled_wf,
    schedule="* * * * * *", # Every second
    context={"environment": "production"},
)

# Verify schedule creation
sched = DBOS.get_schedule("static-class-schedule")
assert sched["workflow_class_name"] == "MyScheduledClass"
assert sched["context"] == {"environment": "production"}

Multiple Static Methods in Single Class#

Example from test_classdecorators.py:

from dbos import DBOS
import sqlalchemy as sa

@DBOS.dbos_class()
class DataWorkflows:
    workflow_count = 0
    transaction_count = 0

    @staticmethod
    @DBOS.workflow()
    def process_data(input_data: str) -> str:
        DataWorkflows.workflow_count += 1

        # Call transaction and step methods
        db_result = DataWorkflows.save_to_db(input_data)
        processed = DataWorkflows.transform_data(input_data)

        return f"{db_result}-{processed}"

    @staticmethod
    @DBOS.transaction()
    def save_to_db(data: str) -> str:
        DataWorkflows.transaction_count += 1
        rows = DBOS.sql_session.execute(sa.text("SELECT 1")).fetchall()
        return f"{data}-{rows[0][0]}"

    @staticmethod
    @DBOS.step()
    def transform_data(data: str) -> str:
        return data.upper()

# Execute the workflow
result = DataWorkflows.process_data("test")
print(result) # Output: "test-1-TEST"

Instance Method with DBOSConfiguredInstance#

from dbos import DBOS, DBOSConfiguredInstance

@DBOS.dbos_class()
class APIClient(DBOSConfiguredInstance):
    def __init__(self, api_key: str, base_url: str):
        self.api_key = api_key
        self.base_url = base_url
        super().__init__(config_name=f"api-{base_url}")

    @DBOS.workflow()
    def fetch_data(self, endpoint: str) -> dict:
        url = f"{self.base_url}/{endpoint}"
        return self.make_request(url)

    @DBOS.step()
    def make_request(self, url: str) -> dict:
        # Simulated API request with authentication
        return {"url": url, "auth": self.api_key}

# Create configured instances before DBOS.launch()
prod_api = APIClient("prod-key", "https://api.prod.com")
dev_api = APIClient("dev-key", "https://api.dev.com")

# Use the workflow
result = prod_api.fetch_data("users")

Instance Method Scheduling Rejection#

Example from test_scheduler.py demonstrating validation:

import pytest
from dbos import DBOS, DBOSConfiguredInstance, DBOSException

@DBOS.dbos_class()
class InvalidScheduled(DBOSConfiguredInstance):
    def __init__(self):
        super().__init__("invalid-config")

    @DBOS.workflow()
    def instance_workflow(self, scheduled_at: datetime, ctx: Any) -> None:
        pass

inst = InvalidScheduled()

# This raises DBOSException
with pytest.raises(
    DBOSException, 
    match="Configured instance methods cannot be used"
):
    DBOS.create_schedule(
        schedule_name="invalid-schedule",
        workflow_fn=inst.instance_workflow,
        schedule="* * * * *",
    )

Use Cases and Best Practices#

Class-based workflows are recommended for:

  1. Code Organization: Grouping related workflows, transactions, and steps within a single class for improved maintainability
  2. Domain-Specific Managers: Creating specialized workflow managers like PaymentWorkflows, ReportingWorkflows, or NotificationWorkflows
  3. Shared Configuration: Workflows that benefit from shared static configuration or utility methods
  4. Modular Architecture: Building testable, reusable workflow components that can be easily composed
  5. Complex Orchestration: Managing multi-step workflows where all steps logically belong together

For scheduled workflows specifically, always use static methods to ensure compatibility with the scheduler system.

When to Use Automatic Backfill#

The automatic_backfill parameter is useful for critical scheduled tasks that must run even after downtime:

  • Use automatic backfill for critical business operations (e.g., daily financial reports, compliance tasks, data synchronization)
  • Avoid automatic backfill when backfilling could cause problems (e.g., sending multiple outdated notifications, overwhelming external APIs, or processing stale data)

When automatic backfill is enabled, the scheduler tracks the last_fired_at timestamp and automatically backfills all missed executions between that time and the current time when the application restarts.

When to Specify Cron Timezone#

The cron_timezone parameter is valuable for schedules tied to specific geographic regions:

  • Specify a timezone for business operations aligned to local time (e.g., daily reports at 9 AM local time, office hours automations)
  • Use UTC (default) for system operations, distributed services, or schedules independent of local business hours

Without a timezone specification, all cron expressions are evaluated in UTC. This ensures consistent behavior across distributed deployments but may not align with local business hours in different time zones.

Limitations#

  1. Scheduling Restriction: Only static class methods can be scheduled as workflows. Instance methods of DBOSConfiguredInstance classes cannot be registered with the scheduler.

  2. [Context Serialization: The context parameter for scheduled workflows must be JSON-serializable](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Comparison Table). Complex objects requiring custom serialization are not supported.

  3. Instance Timing: All DBOSConfiguredInstance instances must be created before calling DBOS.launch() to ensure proper registration in the recovery registry.

  4. Class Name Uniqueness: The registered class name (via class_name parameter or cls.__qualname__) must be unique within an application to avoid workflow resolution conflicts.

Relevant Code Files#

FileDescriptionURL
dbos/_registrations.pyClass registration and function type detectionView File
dbos/_dbos.pyDBOS main class with schedule creation and management APIsView File
dbos/_scheduler.pyDynamic scheduler implementation with class method supportView File
dbos/_scheduler_decorator.pyDecorator-based scheduler implementationView File
dbos/_core.pyCore workflow execution and class method invocation logicView File
dbos/_schemas/system_database.pyDatabase schema including workflow_schedules tableView File
tests/test_scheduler.pyTest cases for class-based scheduled workflowsView File
tests/test_classdecorators.pyTest cases for class-based workflow patternsView File

Historical Context#

The class-based workflow feature evolved through several key pull requests:

This progression demonstrates DBOS Python's evolution toward providing flexible, production-ready workflow scheduling capabilities with first-class support for object-oriented design patterns.