Migration Guide: Decorator-Based to Dynamic Workflow Scheduling#
Overview#
PR #581 (merged February 12, 2026) introduced a major rework of DBOS Python's scheduling system with a hybrid architecture supporting both:
- Static scheduling via @scheduled() decorator (compile-time, code-based)
- Dynamic scheduling via DBOS.create_schedule() (runtime, database-persisted)
This guide helps you migrate from the old decorator-based pattern to the new dynamic scheduling API.
Critical Breaking Change: Function Signature#
This is the most important difference between the two approaches:
Old Decorator Approach#
Functions take (scheduled_time: datetime, actual_time: datetime)
The second parameter is the actual invocation time passed as datetime.now(timezone.utc).
New Dynamic Approach#
Functions take (scheduled_time: datetime, context: Any)
The second parameter is a custom context object that can be any JSON-serializable value.
Comparison Table#
| Aspect | @DBOS.scheduled() (Old) | DBOS.create_schedule() (New) |
|---|---|---|
| Function Signature | (datetime, datetime) | (datetime, Any) |
| Second Parameter | Actual invocation time | Custom context object |
| Context Support | None | Full support for any JSON-serializable value |
| Definition Time | Compile-time via decorator | Runtime via API call |
| Persistence | Code-only | Database-persisted |
| Survives Restart | No (requires code) | Yes (stored in DB) |
| Runtime Modification | Requires code change | Yes, via API |
| Lifecycle Management | Limited | Pause/resume/delete/backfill/trigger |
| Version Management | Tied to code version | Always runs on latest version |
| Automatic Backfill | No | Optional (on startup) |
| Timezone Support | UTC only | Configurable (IANA timezone names) |
| Queue Assignment | No | Optional queue_name parameter for concurrency management |
| Async Support | Yes | Yes |
| Class Method Support | Yes | Yes (via PR #589) |
Migration Steps#
Step 1: Update Function Signatures#
Before (Decorator Pattern):
@DBOS.scheduled("*/5 * * * * *")
@DBOS.workflow()
def my_workflow(scheduled: datetime, actual: datetime) -> None:
# The second parameter is the actual execution time
logger.info(f"Scheduled for {scheduled}, actually ran at {actual}")
# ... workflow logic
After (Dynamic Pattern):
@DBOS.workflow()
def my_workflow(scheduled_time: datetime, context: Any) -> None:
# The second parameter is now a custom context object
# If you need actual time, capture it manually
actual_time = datetime.now(timezone.utc)
logger.info(f"Scheduled for {scheduled_time}, context: {context}")
logger.info(f"Actually ran at {actual_time}")
# ... workflow logic
Step 2: Replace Decorator with API Call#
Remove the @DBOS.scheduled() decorator and create the schedule dynamically using DBOS.create_schedule():
# Create schedule at application startup or via API endpoint
DBOS.create_schedule(
schedule_name="my-schedule",
workflow_fn=my_workflow,
schedule="*/5 * * * * *", # Note: seconds are supported
context={"env": "production", "version": "1.0"}, # Any JSON-serializable value
automatic_backfill=False, # Optional: enable automatic backfill on startup
cron_timezone=None, # Optional: IANA timezone (e.g., "America/New_York")
queue_name=None, # Optional: name of a declared queue for concurrency management
)
Step 3: Handle Schedule Persistence#
Important persistence behavior:
- Decorator schedules: Removing the decorator stops the schedule. The schedule only exists in code.
- Dynamic schedules: Removing the
create_schedule()call does NOT stop the schedule. Schedules persist in the database and survive restarts.
To remove a dynamic schedule, explicitly call:
DBOS.delete_schedule("my-schedule")
Step 4: Use New Management APIs#
The dynamic scheduling API provides extensive management capabilities:
# List all schedules
schedules = DBOS.list_schedules()
for schedule in schedules:
print(f"{schedule.schedule_name}: {schedule.schedule}")
# Pause a schedule (stops execution but keeps config)
DBOS.pause_schedule("my-schedule")
# Resume a paused schedule
DBOS.resume_schedule("my-schedule")
# Trigger immediate execution (ignores cron schedule)
DBOS.trigger_schedule("my-schedule")
# Delete a schedule permanently
DBOS.delete_schedule("my-schedule")
# Backfill missed executions over a date range
DBOS.backfill_schedule(
schedule_name="my-schedule",
start_time=datetime(2026, 1, 1),
end_time=datetime(2026, 1, 31),
)
# Atomic update of multiple schedules
DBOS.apply_schedules([
ScheduleInput(
schedule_name="schedule-1",
workflow_fn=workflow_a,
schedule="0 * * * *",
context={"id": 1},
automatic_backfill=False,
cron_timezone=None,
queue_name=None, # Optional: enqueue to a declared queue
),
ScheduleInput(
schedule_name="schedule-2",
workflow_fn=workflow_b,
schedule="0 */2 * * *",
context={"id": 2},
automatic_backfill=True,
cron_timezone="America/New_York",
queue_name="high-priority-queue", # Must be declared before use
),
])
Queue Assignment for Concurrency Management:
Dynamic schedules support an optional queue_name parameter to enqueue scheduled workflows to specific declared queues. This enables concurrency management for scheduled tasks.
from dbos import DBOS, Queue
# Declare a queue for rate-limited workflows
rate_limited_queue = Queue("rate-limited", concurrency=5)
# Schedule workflows to use the queue
DBOS.create_schedule(
schedule_name="rate-limited-task",
workflow_fn=my_workflow,
schedule="*/5 * * * * *",
context={"task": "batch-processing"},
queue_name="rate-limited", # Must be declared before use
)
Notes:
- The queue must be declared (via
Queue("name")) before being referenced in a schedule - If
queue_nameisNoneor omitted, scheduled workflows use the internal queue - The
ScheduleInputTypedDict includes an optionalqueue_name: Optional[str]field
Step 5: Understand Application Version Management#
Dynamic schedules interact with DBOS application versioning to ensure scheduled workflows execute on the latest code:
Key Behavior: Dynamically scheduled workflows are always enqueued to the latest application version. This means when a scheduled workflow fires, it will run on the most recent version of your application code, not necessarily the version that created the schedule.
Version Management APIs:
# List all application versions (newest first)
versions = DBOS.list_application_versions()
for version in versions:
print(f"{version['version_name']}: {version['version_timestamp']}")
# Get the latest application version
latest = DBOS.get_latest_application_version()
print(f"Latest version: {latest['version_name']}")
# Set a specific version as latest (updates its timestamp)
DBOS.set_latest_application_version("v1.2.3")
Version Tracking:
- Application versions are automatically computed based on workflow source code and recorded in the database when the executor starts
- In certain deployment environments where source code cannot be inspected, DBOS will fall back to "DEFAULT_VERSION" and log a warning
- If you see this warning, set a custom version through the
application_versionfield in DBOSConfig to ensure proper version tracking across deployments - If an executor is not running the latest version, it prints a warning on startup
- This ensures you're aware when running older code in production
Why This Matters for Scheduled Workflows:
When you deploy a new version of your application with updated workflow code, existing dynamic schedules will automatically start executing the new code without requiring any schedule updates. This is a major advantage over decorator-based scheduling, where the schedule is tied to the code version.
When to Migrate#
Migrate to Dynamic Scheduling If You Need:#
- Runtime schedule modifications without redeployment
- Schedule persistence across application restarts
- Pause/resume capabilities for maintenance windows
- Backfill functionality to replay missed executions
- Custom context data passed to workflows
- Dynamic schedule creation based on business logic or user input
- Automatic execution on the latest application version when deploying updates
- Timezone-aware scheduling for business hours in specific regions
- Automatic backfill on startup to recover from downtime
- Queue assignment for concurrency management of scheduled workflows
Keep Decorator Scheduling If:#
- Schedules are static and never change
- You want tight coupling between code and schedule definition
- You don't need context parameters (just timing information)
- You prefer simplicity over flexibility
Complete Example: Before and After#
Before: Decorator-Based#
from datetime import datetime
from dbos import DBOS
@DBOS.scheduled("*/5 * * * * *")
@DBOS.workflow()
def data_sync_workflow(scheduled: datetime, actual: datetime) -> None:
"""Syncs data every 5 seconds"""
print(f"Running data sync at {actual}")
# Sync logic here
@DBOS.scheduled("0 0 * * *")
@DBOS.workflow()
def daily_report(scheduled: datetime, actual: datetime) -> None:
"""Generates daily report at midnight"""
print(f"Generating report for {scheduled.date()}")
# Report generation logic
After: Dynamic Scheduling#
from datetime import datetime
from typing import Any
from dbos import DBOS
@DBOS.workflow()
def data_sync_workflow(scheduled_time: datetime, context: Any) -> None:
"""Syncs data every 5 seconds"""
actual_time = datetime.now(timezone.utc)
env = context.get("env", "unknown")
print(f"Running data sync in {env} environment at {actual_time}")
# Sync logic here
@DBOS.workflow()
def daily_report(scheduled_time: datetime, context: Any) -> None:
"""Generates daily report at midnight"""
report_type = context.get("report_type", "standard")
print(f"Generating {report_type} report for {scheduled_time.date()}")
# Report generation logic
# Create schedules at application startup
def initialize_schedules():
DBOS.create_schedule(
schedule_name="data-sync",
workflow_fn=data_sync_workflow,
schedule="*/5 * * * * *",
context={"env": "production", "priority": "high"},
)
DBOS.create_schedule(
schedule_name="daily-report",
workflow_fn=daily_report,
schedule="0 0 * * *",
context={"report_type": "comprehensive", "email": True},
cron_timezone="America/New_York", # 9 AM Eastern = midnight local
automatic_backfill=True, # Ensure no reports are missed
)
# Later: Pause for maintenance
DBOS.pause_schedule("data-sync")
# Resume after maintenance
DBOS.resume_schedule("data-sync")
# Backfill missed reports
DBOS.backfill_schedule(
schedule_name="daily-report",
start_time=datetime(2026, 2, 1),
end_time=datetime(2026, 2, 10),
)
Key Implementation Details#
Reliability Improvements#
PR #484 (October 3, 2025) added critical reliability features:
- Jitter of up to 10% of sleep time (capped at 10 seconds) to prevent thundering herd
- Duplicate execution prevention in distributed environments
- Dynamic queue retrieval from the registry
Class Method Support#
PR #589 (February 18, 2026) added support for:
- Scheduling static class methods
workflow_class_nameparameter in schedule APIs- Database schema updates for class-based workflows
Async Support#
PR #493 (October 20, 2025) enabled:
- Async scheduled workflows for both patterns
- Better handling of I/O-bound tasks in scheduled workflows
Related Pull Requests#
- PR #581 - Workflow Schedules (Feb 12, 2026) - Main scheduling rework
- PR #625 - Allow Scheduling to Queues (Mar 2026) - Queue assignment support for scheduled workflows
- PR #611 - Improved Scheduler (Mar 2026) - Automatic backfill, timezone support, last_fired_at tracking
- PR #598 - Improved Versioning Support (Feb 2026) - Application version management APIs
- PR #484 - Better Scheduler (Oct 3, 2025) - Reliability improvements
- PR #589 - Support Classes (Feb 18, 2026) - Class method support
- PR #493 - Python Fixes (Oct 20, 2025) - Async support