Documents
DBOS Python Migration Guide: Decorator-Based to Dynamic Workflow Scheduling
DBOS Python Migration Guide: Decorator-Based to Dynamic Workflow Scheduling
Type
Document
Status
Published
Created
Feb 24, 2026
Updated
Mar 23, 2026
Created by
Dosu Bot
Updated by
Dosu Bot

Migration Guide: Decorator-Based to Dynamic Workflow Scheduling#

Overview#

PR #581 (merged February 12, 2026) introduced a major rework of DBOS Python's scheduling system with a hybrid architecture supporting both:

This guide helps you migrate from the old decorator-based pattern to the new dynamic scheduling API.

Critical Breaking Change: Function Signature#

This is the most important difference between the two approaches:

Old Decorator Approach#

Functions take (scheduled_time: datetime, actual_time: datetime)

The second parameter is the actual invocation time passed as datetime.now(timezone.utc).

New Dynamic Approach#

Functions take (scheduled_time: datetime, context: Any)

The second parameter is a custom context object that can be any JSON-serializable value.

Comparison Table#

Aspect@DBOS.scheduled() (Old)DBOS.create_schedule() (New)
Function Signature(datetime, datetime)(datetime, Any)
Second ParameterActual invocation timeCustom context object
Context SupportNoneFull support for any JSON-serializable value
Definition TimeCompile-time via decoratorRuntime via API call
PersistenceCode-onlyDatabase-persisted
Survives RestartNo (requires code)Yes (stored in DB)
Runtime ModificationRequires code changeYes, via API
Lifecycle ManagementLimitedPause/resume/delete/backfill/trigger
Version ManagementTied to code versionAlways runs on latest version
Automatic BackfillNoOptional (on startup)
Timezone SupportUTC onlyConfigurable (IANA timezone names)
Queue AssignmentNoOptional queue_name parameter for concurrency management
Async SupportYesYes
Class Method SupportYesYes (via PR #589)

Migration Steps#

Step 1: Update Function Signatures#

Before (Decorator Pattern):

@DBOS.scheduled("*/5 * * * * *")
@DBOS.workflow()
def my_workflow(scheduled: datetime, actual: datetime) -> None:
    # The second parameter is the actual execution time
    logger.info(f"Scheduled for {scheduled}, actually ran at {actual}")
    # ... workflow logic

After (Dynamic Pattern):

@DBOS.workflow()
def my_workflow(scheduled_time: datetime, context: Any) -> None:
    # The second parameter is now a custom context object
    # If you need actual time, capture it manually
    actual_time = datetime.now(timezone.utc)

    logger.info(f"Scheduled for {scheduled_time}, context: {context}")
    logger.info(f"Actually ran at {actual_time}")
    # ... workflow logic

Step 2: Replace Decorator with API Call#

Remove the @DBOS.scheduled() decorator and create the schedule dynamically using DBOS.create_schedule():

# Create schedule at application startup or via API endpoint
DBOS.create_schedule(
    schedule_name="my-schedule",
    workflow_fn=my_workflow,
    schedule="*/5 * * * * *", # Note: seconds are supported
    context={"env": "production", "version": "1.0"}, # Any JSON-serializable value
    automatic_backfill=False, # Optional: enable automatic backfill on startup
    cron_timezone=None, # Optional: IANA timezone (e.g., "America/New_York")
    queue_name=None, # Optional: name of a declared queue for concurrency management
)

Step 3: Handle Schedule Persistence#

Important persistence behavior:

  • Decorator schedules: Removing the decorator stops the schedule. The schedule only exists in code.
  • Dynamic schedules: Removing the create_schedule() call does NOT stop the schedule. Schedules persist in the database and survive restarts.

To remove a dynamic schedule, explicitly call:

DBOS.delete_schedule("my-schedule")

Step 4: Use New Management APIs#

The dynamic scheduling API provides extensive management capabilities:

# List all schedules
schedules = DBOS.list_schedules()
for schedule in schedules:
    print(f"{schedule.schedule_name}: {schedule.schedule}")

# Pause a schedule (stops execution but keeps config)
DBOS.pause_schedule("my-schedule")

# Resume a paused schedule
DBOS.resume_schedule("my-schedule")

# Trigger immediate execution (ignores cron schedule)
DBOS.trigger_schedule("my-schedule")

# Delete a schedule permanently
DBOS.delete_schedule("my-schedule")

# Backfill missed executions over a date range
DBOS.backfill_schedule(
    schedule_name="my-schedule",
    start_time=datetime(2026, 1, 1),
    end_time=datetime(2026, 1, 31),
)

# Atomic update of multiple schedules
DBOS.apply_schedules([
    ScheduleInput(
        schedule_name="schedule-1",
        workflow_fn=workflow_a,
        schedule="0 * * * *",
        context={"id": 1},
        automatic_backfill=False,
        cron_timezone=None,
        queue_name=None, # Optional: enqueue to a declared queue
    ),
    ScheduleInput(
        schedule_name="schedule-2",
        workflow_fn=workflow_b,
        schedule="0 */2 * * *",
        context={"id": 2},
        automatic_backfill=True,
        cron_timezone="America/New_York",
        queue_name="high-priority-queue", # Must be declared before use
    ),
])

Queue Assignment for Concurrency Management:

Dynamic schedules support an optional queue_name parameter to enqueue scheduled workflows to specific declared queues. This enables concurrency management for scheduled tasks.

from dbos import DBOS, Queue

# Declare a queue for rate-limited workflows
rate_limited_queue = Queue("rate-limited", concurrency=5)

# Schedule workflows to use the queue
DBOS.create_schedule(
    schedule_name="rate-limited-task",
    workflow_fn=my_workflow,
    schedule="*/5 * * * * *",
    context={"task": "batch-processing"},
    queue_name="rate-limited", # Must be declared before use
)

Notes:

  • The queue must be declared (via Queue("name")) before being referenced in a schedule
  • If queue_name is None or omitted, scheduled workflows use the internal queue
  • The ScheduleInput TypedDict includes an optional queue_name: Optional[str] field

Step 5: Understand Application Version Management#

Dynamic schedules interact with DBOS application versioning to ensure scheduled workflows execute on the latest code:

Key Behavior: Dynamically scheduled workflows are always enqueued to the latest application version. This means when a scheduled workflow fires, it will run on the most recent version of your application code, not necessarily the version that created the schedule.

Version Management APIs:

# List all application versions (newest first)
versions = DBOS.list_application_versions()
for version in versions:
    print(f"{version['version_name']}: {version['version_timestamp']}")

# Get the latest application version
latest = DBOS.get_latest_application_version()
print(f"Latest version: {latest['version_name']}")

# Set a specific version as latest (updates its timestamp)
DBOS.set_latest_application_version("v1.2.3")

Version Tracking:

  • Application versions are automatically computed based on workflow source code and recorded in the database when the executor starts
  • In certain deployment environments where source code cannot be inspected, DBOS will fall back to "DEFAULT_VERSION" and log a warning
  • If you see this warning, set a custom version through the application_version field in DBOSConfig to ensure proper version tracking across deployments
  • If an executor is not running the latest version, it prints a warning on startup
  • This ensures you're aware when running older code in production

Why This Matters for Scheduled Workflows:
When you deploy a new version of your application with updated workflow code, existing dynamic schedules will automatically start executing the new code without requiring any schedule updates. This is a major advantage over decorator-based scheduling, where the schedule is tied to the code version.

When to Migrate#

Migrate to Dynamic Scheduling If You Need:#

  • Runtime schedule modifications without redeployment
  • Schedule persistence across application restarts
  • Pause/resume capabilities for maintenance windows
  • Backfill functionality to replay missed executions
  • Custom context data passed to workflows
  • Dynamic schedule creation based on business logic or user input
  • Automatic execution on the latest application version when deploying updates
  • Timezone-aware scheduling for business hours in specific regions
  • Automatic backfill on startup to recover from downtime
  • Queue assignment for concurrency management of scheduled workflows

Keep Decorator Scheduling If:#

  • Schedules are static and never change
  • You want tight coupling between code and schedule definition
  • You don't need context parameters (just timing information)
  • You prefer simplicity over flexibility

Complete Example: Before and After#

Before: Decorator-Based#

from datetime import datetime
from dbos import DBOS

@DBOS.scheduled("*/5 * * * * *")
@DBOS.workflow()
def data_sync_workflow(scheduled: datetime, actual: datetime) -> None:
    """Syncs data every 5 seconds"""
    print(f"Running data sync at {actual}")
    # Sync logic here

@DBOS.scheduled("0 0 * * *")
@DBOS.workflow()
def daily_report(scheduled: datetime, actual: datetime) -> None:
    """Generates daily report at midnight"""
    print(f"Generating report for {scheduled.date()}")
    # Report generation logic

After: Dynamic Scheduling#

from datetime import datetime
from typing import Any
from dbos import DBOS

@DBOS.workflow()
def data_sync_workflow(scheduled_time: datetime, context: Any) -> None:
    """Syncs data every 5 seconds"""
    actual_time = datetime.now(timezone.utc)
    env = context.get("env", "unknown")

    print(f"Running data sync in {env} environment at {actual_time}")
    # Sync logic here

@DBOS.workflow()
def daily_report(scheduled_time: datetime, context: Any) -> None:
    """Generates daily report at midnight"""
    report_type = context.get("report_type", "standard")

    print(f"Generating {report_type} report for {scheduled_time.date()}")
    # Report generation logic

# Create schedules at application startup
def initialize_schedules():
    DBOS.create_schedule(
        schedule_name="data-sync",
        workflow_fn=data_sync_workflow,
        schedule="*/5 * * * * *",
        context={"env": "production", "priority": "high"},
    )

    DBOS.create_schedule(
        schedule_name="daily-report",
        workflow_fn=daily_report,
        schedule="0 0 * * *",
        context={"report_type": "comprehensive", "email": True},
        cron_timezone="America/New_York", # 9 AM Eastern = midnight local
        automatic_backfill=True, # Ensure no reports are missed
    )

# Later: Pause for maintenance
DBOS.pause_schedule("data-sync")

# Resume after maintenance
DBOS.resume_schedule("data-sync")

# Backfill missed reports
DBOS.backfill_schedule(
    schedule_name="daily-report",
    start_time=datetime(2026, 2, 1),
    end_time=datetime(2026, 2, 10),
)

Key Implementation Details#

Reliability Improvements#

PR #484 (October 3, 2025) added critical reliability features:

Class Method Support#

PR #589 (February 18, 2026) added support for:

  • Scheduling static class methods
  • workflow_class_name parameter in schedule APIs
  • Database schema updates for class-based workflows

Async Support#

PR #493 (October 20, 2025) enabled:

Additional Resources#