Class-Based Workflows#
Overview#
Class-Based Workflows in DBOS Python enable developers to define workflows as static class methods in addition to traditional module-level function workflows. This architectural pattern allows for better code organization by grouping related workflow logic within classes while maintaining full integration with the DBOS scheduling and execution infrastructure. The feature was introduced in PR #589 on February 18, 2026, building upon the dynamic workflow scheduling infrastructure established in PR #581.
The class-based workflow system supports both static and class methods as workflow definitions, though only static class methods can be scheduled. Instance methods of configured classes, while usable for workflow execution, cannot be registered with the scheduler due to the requirement that the scheduler must be able to invoke workflows without access to object instances. This design decision maintains consistency with the stateless nature of scheduled job execution.
Class-based workflows integrate seamlessly with both decorator-based scheduling (@DBOS.scheduled()) and dynamic scheduling APIs (DBOS.create_schedule()). The system automatically tracks class metadata through the workflow_class_name field in the workflow schedules database table, enabling proper workflow resolution and invocation across application restarts and distributed deployments.
Workflow Definition Patterns#
DBOS Python supports two primary patterns for defining workflows:
Function-Based Workflows#
Traditional module-level functions decorated with @DBOS.workflow() provide a straightforward approach for simple workflow definitions. These workflows are standalone functions that do not require class context.
Class-Based Workflows#
Workflows defined as class methods offer improved code organization and modularity. All classes containing DBOS workflows must be decorated with @DBOS.dbos_class(), which ensures proper registration with the DBOS runtime and provides class-level configuration metadata. Class-based workflows can be implemented as:
- Static methods: Self-contained workflows that do not require class or instance state
- Class methods: Workflows that may need access to class-level attributes
- Instance methods: Workflows that require instance-specific state and configuration
Instance methods with DBOS decorators require the class to inherit from DBOSConfiguredInstance, which provides a framework for managing configured instances with unique identifiers used during workflow recovery.
The @DBOS.dbos_class() Decorator#
The @DBOS.dbos_class() decorator serves as the registration mechanism for all classes containing DBOS workflow, transaction, or step functions. This decorator ensures that functions are properly registered with class-level configuration information and enables the DBOS runtime to correctly resolve and invoke class-based workflows.
The decorator accepts an optional class_name parameter that defaults to cls.__qualname__, allowing developers to customize the registered name used for workflow identification in the database and logs.
Example usage:
from dbos import DBOS
from datetime import datetime
from typing import Any
@DBOS.dbos_class()
class PaymentWorkflows:
@staticmethod
@DBOS.workflow()
def process_payment(scheduled_at: datetime, ctx: Any) -> str:
# Payment processing logic
return "completed"
DBOSConfiguredInstance for Instance Methods#
Instance methods with workflow decorators require inheriting from DBOSConfiguredInstance, a base class that provides the infrastructure for managing configured workflow instances. Key requirements include:
- Each instance must provide a unique
config_nameduring instantiation - All instances must be created before
DBOS.launch()is called - The
config_nameis used for workflow recovery via a global registry - Scheduled workflows are not supported for configured instances
This architecture enables DBOS to reconstruct the correct instance context when recovering workflows from the database after application restarts.
Example:
from dbos import DBOS, DBOSConfiguredInstance
@DBOS.dbos_class()
class URLFetcher(DBOSConfiguredInstance):
def __init__(self, url: str):
self.url = url
super().__init__(config_name=url)
@DBOS.workflow()
def fetch_workflow(self) -> str:
return self.fetch_url()
@DBOS.step()
def fetch_url(self) -> str:
# Fetching logic
return f"Content from {self.url}"
Function Type Classification#
DBOS uses the DBOSFuncType enumeration to distinguish between different method types during class registration and workflow execution. The classification is performed in the get_or_create_class_info() function in dbos/_registrations.py:
- DBOSFuncType.Static: Set when
isinstance(attribute, staticmethod)is true - DBOSFuncType.Class: Set when
isinstance(attribute, classmethod)is true - DBOSFuncType.Instance: Set for regular instance methods
This classification determines how workflows are invoked during execution. For static methods, the class object is not prepended to the arguments, while instance and class methods have the appropriate class context resolved from the registry.
Database Storage and Metadata#
Scheduled workflows are persisted in the workflow_schedules table, which includes a dedicated workflow_class_name field to track class-based workflows. The schema includes:
schedule_id: Unique identifier for the scheduleschedule_name: Human-readable unique nameworkflow_name: Fully-qualified workflow function nameworkflow_class_name: Fully-qualified class name for static methods, orNonefor module-level functionsschedule: Cron expression supporting six fields (including seconds)status: Current schedule status (ACTIVE, PAUSED, etc.)context: JSON-serializable context object passed to each invocationlast_fired_at: ISO timestamp of the last execution, used for automatic backfill and monitoringautomatic_backfill: Boolean flag enabling automatic backfilling of missed executions on scheduler startupcron_timezone: IANA timezone name (e.g., "America/New_York") for evaluating the cron expression, orNonefor UTCqueue_name: Target queue for scheduled workflow executions, orNonefor the internal queue
The workflow_class_name column is nullable for backward compatibility with existing module-level workflow schedules. Migration scripts were added in _migration.py for both PostgreSQL and SQLite to enable smooth upgrades.
Scheduled Workflow Function Signature#
Scheduled workflows, whether class-based or function-based, must follow a specific signature pattern. [Dynamic scheduling requires workflows to accept (scheduled_time: datetime, context: Any) parameters](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Critical Breaking Change: Function Signature), where [context can be any JSON-serializable value](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Comparison Table).
Example signature:
@staticmethod
@DBOS.workflow()
def scheduled_workflow(scheduled_time: datetime, context: Any) -> None:
"""Process scheduled task with provided context."""
pass
Scheduler Integration#
Static vs. Dynamic Scheduling#
DBOS supports two scheduling approaches:
- Decorator-Based Scheduling: The
@DBOS.scheduled()decorator defines schedules in code - Dynamic Scheduling: The
DBOS.create_schedule()API creates schedules at runtime
[Dynamic scheduling provides explicit support for class-based workflows through the workflow_class_name parameter](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Class Method Support) in schedule APIs. The workflow_class_name is automatically detected using get_func_info() during schedule creation and extracted from fi.class_info.registered_name if present.
Creating Class-Based Scheduled Workflows#
from dbos import DBOS
from datetime import datetime
from typing import Any
@DBOS.dbos_class()
class DataSyncWorkflows:
@staticmethod
@DBOS.workflow()
def sync_data(scheduled_time: datetime, context: Any) -> None:
env = context.get("env", "production")
# Sync logic here
pass
# Create schedule with timezone and automatic backfill
DBOS.create_schedule(
schedule_name="data-sync-schedule",
workflow_fn=DataSyncWorkflows.sync_data,
schedule="0 9 * * 1-5", # 9 AM weekdays
context={"env": "production", "priority": "high"},
cron_timezone="America/New_York",
automatic_backfill=True,
)
The create_schedule() method accepts several optional parameters:
automatic_backfill: bool = False: WhenTrue, automatically backfills missed executions when the scheduler starts up. The scheduler uses thelast_fired_atfield to determine which executions were missed during downtime.cron_timezone: Optional[str] = None: IANA timezone name (e.g., "America/New_York", "Europe/London", "Asia/Tokyo") for evaluating the cron expression. IfNone, uses UTC. This is useful for schedules tied to business hours in specific regions.queue_name: Optional[str] = None: Optional name of a declared queue to enqueue scheduled workflows to. IfNone, uses the internal queue. Defaults toNone. This allows managing concurrency of scheduled workflows.
Validation and Restrictions#
DBOS.create_schedule() and DBOS.apply_schedules() include validation logic that raises a DBOSException if instance methods are passed:
if fi and fi.func_type == DBOSFuncType.Instance:
raise DBOSException(
"Configured instance methods cannot be used as scheduled workflows"
)
Test cases verify this behavior, ensuring the system fails fast with a clear error message when instance methods are incorrectly used for scheduling.
Lifecycle Management#
Dynamic schedules persist in the database and survive application restarts. [Unlike decorator-based schedules which only exist in code, removing a create_schedule() call does not stop a dynamic schedule](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Step 3: Handle Schedule Persistence). Schedules must be explicitly deleted using DBOS.delete_schedule() or removed via DBOS.apply_schedules().
[The dynamic scheduling API provides extensive lifecycle management capabilities](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Step 4: Use New Management APIs):
# List all schedules
schedules = DBOS.list_schedules()
# Retrieve specific schedule
sched = DBOS.get_schedule("data-sync-schedule")
# Pause execution (retains configuration)
DBOS.pause_schedule("data-sync-schedule")
# Resume a paused schedule
DBOS.resume_schedule("data-sync-schedule")
# Trigger immediate execution
DBOS.trigger_schedule("data-sync-schedule")
# Delete permanently
DBOS.delete_schedule("data-sync-schedule")
# Backfill missed executions
DBOS.backfill_schedule(
schedule_name="data-sync-schedule",
start_time=datetime(2026, 1, 1),
end_time=datetime(2026, 1, 31),
)
# Atomic update of multiple schedules
DBOS.apply_schedules([
ScheduleInput(
schedule_name="schedule-1",
workflow_fn=DataWorkflows.method_a,
schedule="0 * * * *",
context={"id": 1},
automatic_backfill=True,
cron_timezone="America/Los_Angeles",
queue_name="high-priority-queue",
),
])
The scheduler automatically tracks when each schedule last executed via the last_fired_at field, which can be queried through DBOS.get_schedule() for monitoring purposes. This field is also used internally to enable automatic backfill functionality when automatic_backfill=True.
Polling and Distribution#
DBOS periodically polls the database for new or updated schedules, with each schedule managed by a dedicated thread. Changes to schedules are immediately reflected across all workers in distributed deployments, ensuring consistent behavior.
Implementation Details#
Class Registration and Detection#
Static class methods are detected during class registration by the get_or_create_class_info() function in dbos/_registrations.py. The function:
- Iterates through class attributes using
isinstance(attribute, staticmethod)checks - Sets the appropriate
DBOSFuncTypebased on method type - Associates class information with the function via the
dbos_func_decorator_infoattribute
Workflow Invocation#
The execute_workflow_by_id() function in _core.py handles class method invocation. The invocation logic:
- Retrieves workflow metadata from the database including
class_name - Resolves the class object from
_registry.class_info_mapfor instance and class methods - For static methods (
DBOSFuncType.Static), does not prepend the class object to arguments - For instance and class methods, prepends the class context appropriately
This design ensures that static methods maintain their standard Python semantics while enabling proper instance recovery for configured instance methods.
Scheduler Integration Details#
The _enqueue_scheduled_workflow function in _scheduler.py accepts a class_name parameter, which is passed to workflow status initialization. During schedule creation, validation raises a DBOSException for instance methods to prevent unsupported configurations from being persisted.
Reliability Features#
[PR #484 (October 3, 2025) added critical reliability features](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Reliability Improvements):
- Jitter of up to 10% of sleep time (capped at 10 seconds) to prevent thundering herd problems
- [Duplicate execution prevention in distributed environments](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Reliability Improvements)
Async Support#
[PR #493 (October 20, 2025) enabled async scheduled workflows](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Async Support) for both decorator and dynamic scheduling patterns, allowing better handling of I/O-bound tasks in scheduled class-based workflows.
Usage Examples#
Basic Static Class Method Workflow#
Example from test_scheduler.py:
from dbos import DBOS
from datetime import datetime
from typing import Any
@DBOS.dbos_class()
class MyScheduledClass:
@staticmethod
@DBOS.workflow()
def scheduled_wf(scheduled_at: datetime, ctx: Any) -> None:
print(f"Executing at {scheduled_at} with context: {ctx}")
# Create the schedule
DBOS.create_schedule(
schedule_name="static-class-schedule",
workflow_fn=MyScheduledClass.scheduled_wf,
schedule="* * * * * *", # Every second
context={"environment": "production"},
)
# Verify schedule creation
sched = DBOS.get_schedule("static-class-schedule")
assert sched["workflow_class_name"] == "MyScheduledClass"
assert sched["context"] == {"environment": "production"}
Multiple Static Methods in Single Class#
Example from test_classdecorators.py:
from dbos import DBOS
import sqlalchemy as sa
@DBOS.dbos_class()
class DataWorkflows:
workflow_count = 0
transaction_count = 0
@staticmethod
@DBOS.workflow()
def process_data(input_data: str) -> str:
DataWorkflows.workflow_count += 1
# Call transaction and step methods
db_result = DataWorkflows.save_to_db(input_data)
processed = DataWorkflows.transform_data(input_data)
return f"{db_result}-{processed}"
@staticmethod
@DBOS.transaction()
def save_to_db(data: str) -> str:
DataWorkflows.transaction_count += 1
rows = DBOS.sql_session.execute(sa.text("SELECT 1")).fetchall()
return f"{data}-{rows[0][0]}"
@staticmethod
@DBOS.step()
def transform_data(data: str) -> str:
return data.upper()
# Execute the workflow
result = DataWorkflows.process_data("test")
print(result) # Output: "test-1-TEST"
Instance Method with DBOSConfiguredInstance#
from dbos import DBOS, DBOSConfiguredInstance
@DBOS.dbos_class()
class APIClient(DBOSConfiguredInstance):
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
super().__init__(config_name=f"api-{base_url}")
@DBOS.workflow()
def fetch_data(self, endpoint: str) -> dict:
url = f"{self.base_url}/{endpoint}"
return self.make_request(url)
@DBOS.step()
def make_request(self, url: str) -> dict:
# Simulated API request with authentication
return {"url": url, "auth": self.api_key}
# Create configured instances before DBOS.launch()
prod_api = APIClient("prod-key", "https://api.prod.com")
dev_api = APIClient("dev-key", "https://api.dev.com")
# Use the workflow
result = prod_api.fetch_data("users")
Instance Method Scheduling Rejection#
Example from test_scheduler.py demonstrating validation:
import pytest
from dbos import DBOS, DBOSConfiguredInstance, DBOSException
@DBOS.dbos_class()
class InvalidScheduled(DBOSConfiguredInstance):
def __init__(self):
super().__init__("invalid-config")
@DBOS.workflow()
def instance_workflow(self, scheduled_at: datetime, ctx: Any) -> None:
pass
inst = InvalidScheduled()
# This raises DBOSException
with pytest.raises(
DBOSException,
match="Configured instance methods cannot be used"
):
DBOS.create_schedule(
schedule_name="invalid-schedule",
workflow_fn=inst.instance_workflow,
schedule="* * * * *",
)
Use Cases and Best Practices#
Class-based workflows are recommended for:
- Code Organization: Grouping related workflows, transactions, and steps within a single class for improved maintainability
- Domain-Specific Managers: Creating specialized workflow managers like
PaymentWorkflows,ReportingWorkflows, orNotificationWorkflows - Shared Configuration: Workflows that benefit from shared static configuration or utility methods
- Modular Architecture: Building testable, reusable workflow components that can be easily composed
- Complex Orchestration: Managing multi-step workflows where all steps logically belong together
For scheduled workflows specifically, always use static methods to ensure compatibility with the scheduler system.
When to Use Automatic Backfill#
The automatic_backfill parameter is useful for critical scheduled tasks that must run even after downtime:
- Use automatic backfill for critical business operations (e.g., daily financial reports, compliance tasks, data synchronization)
- Avoid automatic backfill when backfilling could cause problems (e.g., sending multiple outdated notifications, overwhelming external APIs, or processing stale data)
When automatic backfill is enabled, the scheduler tracks the last_fired_at timestamp and automatically backfills all missed executions between that time and the current time when the application restarts.
When to Specify Cron Timezone#
The cron_timezone parameter is valuable for schedules tied to specific geographic regions:
- Specify a timezone for business operations aligned to local time (e.g., daily reports at 9 AM local time, office hours automations)
- Use UTC (default) for system operations, distributed services, or schedules independent of local business hours
Without a timezone specification, all cron expressions are evaluated in UTC. This ensures consistent behavior across distributed deployments but may not align with local business hours in different time zones.
Limitations#
-
Scheduling Restriction: Only static class methods can be scheduled as workflows. Instance methods of
DBOSConfiguredInstanceclasses cannot be registered with the scheduler. -
[Context Serialization: The context parameter for scheduled workflows must be JSON-serializable](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Comparison Table). Complex objects requiring custom serialization are not supported.
-
Instance Timing: All
DBOSConfiguredInstanceinstances must be created before callingDBOS.launch()to ensure proper registration in the recovery registry. -
Class Name Uniqueness: The registered class name (via
class_nameparameter orcls.__qualname__) must be unique within an application to avoid workflow resolution conflicts.
Relevant Code Files#
| File | Description | URL |
|---|---|---|
dbos/_registrations.py | Class registration and function type detection | View File |
dbos/_dbos.py | DBOS main class with schedule creation and management APIs | View File |
dbos/_scheduler.py | Dynamic scheduler implementation with class method support | View File |
dbos/_scheduler_decorator.py | Decorator-based scheduler implementation | View File |
dbos/_core.py | Core workflow execution and class method invocation logic | View File |
dbos/_schemas/system_database.py | Database schema including workflow_schedules table | View File |
tests/test_scheduler.py | Test cases for class-based scheduled workflows | View File |
tests/test_classdecorators.py | Test cases for class-based workflow patterns | View File |
Related Topics#
- Workflow Decorators: The
@DBOS.workflow()decorator and related function decorators - Dynamic Workflow Scheduling: Runtime schedule creation with
DBOS.create_schedule() - Decorator-Based Scheduling: Code-defined schedules using
@DBOS.scheduled() - Working with Python Classes: Comprehensive guide to class-based DBOS patterns
- DBOSConfiguredInstance: Base class for instance method workflows
- Workflow Execution and Recovery: The DBOS workflow recovery mechanism
- Transaction and Step Decorators: Additional DBOS function decorators for database operations and idempotent steps
Historical Context#
The class-based workflow feature evolved through several key pull requests:
-
PR #581 - Workflow Schedules (February 12, 2026): Introduced the foundational dynamic workflow scheduling infrastructure including the
workflow_schedulestable andScheduleInputclasses -
PR #589 - Support Classes in Scheduler (February 18, 2026): Added class-based workflow support by introducing the
workflow_class_namefield, automatic class detection, and validation for instance method scheduling -
[PR #493 - Async Support (October 20, 2025)](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Async Support): Enabled async/await patterns for scheduled workflows, applicable to both function-based and class-based workflows
-
[PR #484 - Reliability Improvements (October 3, 2025)](https://app.dosu.dev/documents/835b0c31-bc93-4e86-9d19-c601a330961e#Reliability Improvements): Added jitter to prevent thundering herd problems and duplicate execution prevention for distributed deployments
This progression demonstrates DBOS Python's evolution toward providing flexible, production-ready workflow scheduling capabilities with first-class support for object-oriented design patterns.