Distributed Scheduler Coordination#
Distributed Scheduler Coordination refers to the mechanisms and architectural patterns used to coordinate multiple scheduler instances in distributed computing environments. In the context of DBOS Python, these mechanisms enable reliable, exactly-once execution of scheduled workflows across multiple application instances without requiring external coordination services. The system employs database-centric coordination through a shared PostgreSQL system database, jitter-based load distribution, and deterministic idempotency keys to prevent duplicate task execution.
The coordination architecture addresses fundamental challenges in distributed scheduling: the "thundering herd" problem where multiple instances simultaneously attempt to execute the same scheduled task, duplicate execution prevention across independent schedulers, and reliable failover when instances crash or restart. Introduced through PR #484 in October 2025, these coordination mechanisms specifically target distributed deployment scenarios where multiple application servers must cooperate to execute a shared schedule of workflows.
Unlike traditional distributed schedulers that rely on external coordination services like Apache ZooKeeper or etcd, DBOS employs a polling-based architecture where all instances connect to the same PostgreSQL database, treating the database as the single source of truth for schedule state and execution history. This design eliminates single points of failure while ensuring consistency through database transaction isolation guarantees.
Architecture#
Database-Centric Coordination#
The distributed scheduler architecture centers on a shared PostgreSQL system database that serves as the coordination layer for all scheduler instances. All application instances connect to the same database and periodically poll for scheduled work, eliminating the need for external coordination infrastructure. Schedules are persisted in the workflow_schedules table with fields including schedule_id, schedule_name, workflow_name, cron expression, status, serialized context, last_fired_at timestamp, automatic_backfill flag, optional cron_timezone, and optional queue_name.
This database-centric approach provides several architectural advantages. First, it leverages PostgreSQL's ACID guarantees to ensure consistency across distributed instances. Second, changes to schedules are immediately reflected across all workers through the polling mechanism, enabling dynamic schedule management without requiring configuration synchronization. Third, the architecture eliminates single points of failure as any instance can manage schedules, with automatic failover if an instance crashes.
Threading Model#
Each schedule is managed by a dedicated daemon thread that runs independently of other schedules. This thread-per-schedule model enables concurrent management of multiple schedules within a single application instance, with each thread responsible for calculating the next execution time, applying jitter, checking for duplicate execution, and enqueueing the workflow.
Thread lifecycle is dynamic: threads are created when schedules become active and destroyed when schedules are deleted or paused. Each thread uses a stop event for clean shutdown, allowing graceful termination during application shutdown or schedule deactivation.
Jitter-Based Load Distribution#
Problem: Thundering Herd#
In distributed environments where multiple scheduler instances run the same schedule, a "thundering herd" problem emerges when all instances wake simultaneously at the exact scheduled time. Without coordination, every instance would attempt to execute the scheduled workflow at precisely the same moment, causing:
- Concentrated load spikes on shared resources (database, queues, external services)
- Race conditions as multiple instances compete to claim execution rights
- Wasted CPU cycles as multiple instances perform duplicate idempotency checks
- Potential for cascade failures under high load conditions
Solution: Random Jitter#
Introduced in PR #484 in October 2025, the jitter mechanism adds randomized delays to prevent simultaneous execution attempts. The implementation applies jitter of up to 10% of the sleep time, capped at 10 seconds, distributing execution attempts across a time window rather than concentrating them at a single instant.
The jitter calculation follows this algorithm:
# From _scheduler.py lines 61-66
sleep_time = max(0, sleep_time)
max_jitter = min(sleep_time / 10, 10)
jitter = random.uniform(0, max_jitter)
if self._stop_event.wait(timeout=sleep_time + jitter):
return
For example, with a 60-second sleep interval between executions, max_jitter would be 6 seconds (10% of 60), and each instance would add a random delay between 0 and 6 seconds. For longer intervals like 1 hour, the jitter caps at 10 seconds to maintain reasonable execution timing precision.
This jitter is applied in both the decorator-based scheduler and the dynamic scheduler, ensuring consistent behavior across both scheduling modes.
Duplicate Prevention#
Idempotency Key Mechanism#
Duplicate prevention was introduced in PR #484 specifically for distributed environments. The mechanism constructs deterministic workflow IDs from the schedule name and scheduled execution time, ensuring that each scheduled invocation can occur exactly once regardless of how many instances attempt to execute it.
The workflow ID format varies by scheduler type:
- Dynamic scheduler:
f"sched-{schedule_name}-{next_exec_time.isoformat()}" - Decorator scheduler:
f"sched-{func_name}-{next_exec_time.isoformat()}" - Manual trigger:
f"sched-{schedule_name}-trigger-{now.isoformat()}"
The ISO 8601 timestamp format ensures uniqueness across time while remaining deterministic—every instance generates identical workflow IDs for the same scheduled execution.
Database Status Check#
Before enqueueing a workflow, each scheduler instance performs a database status check:
workflow_id = f"sched-{self.schedule_name}-{next_exec_time.isoformat()}"
if not dbos._sys_db.get_workflow_status(workflow_id):
with SetWorkflowID(workflow_id):
scheduler_queue.enqueue(func, next_exec_time, self.context)
The get_workflow_status() method queries the system database for any existing workflow with the given ID. If no record exists, the workflow is enqueued and its status is recorded in the database. If a record already exists (created by another instance), the check returns True and the current instance skips execution, preventing duplication.
This database-backed coordination works across all instances sharing the same system database. PostgreSQL uses proper transaction isolation levels to prevent race conditions, though SQLite had an isolation bug that was fixed in PR #564.
Scheduler Types#
Static (Decorator-Based) Scheduler#
The static scheduler provides a compile-time scheduling approach introduced in PR #581 in February 2026. Developers define schedules using the @scheduled decorator:
@DBOS.scheduled(cron="0 * * * *") # Every hour (evaluated in UTC by default)
@DBOS.workflow()
def hourly_report(scheduled_time: datetime, context: Any):
# Workflow implementation
pass
The scheduler validates cron expressions at registration time, preventing runtime errors from invalid schedules. Each decorated function gets a single dedicated thread that starts at application initialization and runs until shutdown.
Implementation details are found in _scheduler_decorator.py, which manages the decorator logic and thread lifecycle for static schedules.
Dynamic (API-Based) Scheduler#
The dynamic scheduler enables runtime schedule creation, modification, and deletion through programmatic APIs. Schedules created with DBOS.create_schedule() are persisted in the database and survive application restarts, unlike static decorator-based schedules that exist only in code.
The API accepts optional parameters for controlling schedule behavior:
DBOS.create_schedule(
schedule_name="hourly-report",
workflow_fn=generate_report,
schedule="0 * * * *",
context={"report_type": "summary"},
automatic_backfill=True,
cron_timezone="America/New_York",
queue_name="reports-queue" # Optional: enqueue to a specific queue
)
When queue_name is specified, the schedule validates that the queue is declared. If the queue does not exist in the application's queue registry, DBOS.create_schedule() raises a DBOSException. This validation ensures scheduled workflows are only routed to queues that have been properly configured for concurrency management.
The dynamic scheduler implements a polling loop that periodically checks the database for schedule changes:
def dynamic_scheduler_loop(stop_event: threading.Event,
polling_interval_sec: float) -> None:
schedule_threads: dict[str, _ScheduleThread] = {}
while not stop_event.is_set():
schedules = dbos._sys_db.list_schedules()
current_ids = {s["schedule_id"] for s in schedules}
# Stop threads for deleted schedules
for schedule_id in list(schedule_threads.keys()):
if schedule_id not in current_ids:
schedule_threads[schedule_id].stop()
del schedule_threads[schedule_id]
# Start or stop threads based on status
for schedule in schedules:
if not is_active(schedule) and schedule_id in schedule_threads:
schedule_threads[schedule_id].stop()
del schedule_threads[schedule_id]
elif is_active(schedule) and schedule_id not in schedule_threads:
schedule_threads[schedule_id] = _ScheduleThread(schedule)
Dynamic schedules support pause/resume/delete operations, allowing runtime control without code changes or redeployments. Each active schedule gets its own thread managed by the polling loop.
Advanced Features#
Backfill Operations#
The backfill_schedule() function enables retroactive execution of missed scheduled workflows. This is particularly useful after extended downtime, schedule pauses, or when catching up after fixing a bug:
workflow_ids = DBOS.backfill_schedule(
schedule_name="daily-report",
start=datetime(2026, 1, 1),
end=datetime(2026, 1, 31)
)
The backfill operation iterates through the cron schedule between the start and end times, generating the same deterministic workflow IDs that would have been created during normal execution. The idempotency check automatically skips already-executed times, so backfilling a partially-executed range is safe. The function returns a list of workflow IDs for tracking execution status.
When a schedule has a configured timezone via cron_timezone, the backfill operation evaluates the cron expression in that timezone, ensuring consistency with normal schedule execution.
Automatic Backfill#
Schedules can be configured with automatic_backfill=True to automatically catch up on missed executions when the scheduler starts up. When enabled, the scheduler uses the last_fired_at timestamp to determine which scheduled times were missed while the service was down or the schedule was paused.
On startup, if a schedule has automatic_backfill enabled and a last_fired_at timestamp exists, the scheduler automatically calls backfill_schedule() with a time range from last_fired_at to the current time. This ensures that scheduled workflows execute even if all application instances were stopped during their scheduled time.
In distributed environments, the automatic backfill coordination works as follows:
- The
last_fired_atfield is updated in the database each time a workflow successfully executes - When multiple instances restart simultaneously, the database transaction isolation and idempotency checks prevent duplicate execution of backfilled workflows
- Only one instance will successfully enqueue each backfilled workflow due to the deterministic workflow ID mechanism
Automatic backfill is particularly valuable for schedules that must not skip executions, such as daily reports, billing cycles, or compliance-related workflows. For schedules where missing an execution is acceptable (like periodic cleanup tasks), automatic_backfill can remain disabled (the default).
Manual Triggering#
The trigger_schedule() function executes a scheduled workflow immediately at the current time, bypassing the cron schedule:
workflow_id = DBOS.trigger_schedule("daily-report")
Manual triggers use a distinct workflow ID format (sched-{name}-trigger-{timestamp}) to avoid collisions with regularly scheduled executions. This enables ad-hoc execution for testing, debugging, or handling exceptional circumstances.
Schedule Management#
Dynamic schedules support runtime state management through active/paused status stored in the database. The polling loop continuously monitors schedule status, implementing dynamic thread lifecycle management:
- Pause: Sets status to PAUSED, causing the polling loop to stop the schedule's thread
- Resume: Sets status to ACTIVE, causing the polling loop to create a new thread
- Delete: Removes the schedule from the database, triggering thread cleanup
This management system enables schedule control without requiring application restarts or code deployments.
Application Version Management#
Introduced in PR #598, DBOS provides APIs for managing application versions recorded in the application_versions database table. These APIs enable runtime version control and coordination:
list_application_versions(): Returns all registered versions ordered byversion_timestamp(most recent first)get_latest_application_version(): Returns the version with the highest timestamp, which scheduled workflows will be enqueued toset_latest_application_version(version_name): Updates a version's timestamp to the current time, making it the latest version
Each version record includes a version_id, version_name, version_timestamp (determines which is latest), and created_at timestamp (records when the version was first registered). The version_timestamp can be updated via set_latest_application_version(), while created_at remains immutable.
Application versions are automatically registered when a DBOS instance launches. If the current application version is not the latest, DBOS logs a warning at startup. This version management system ensures scheduled workflows execute on the intended application version, supporting deployment scenarios like gradual rollouts, rollbacks, and A/B testing where multiple versions may coexist.
Reliability Features#
Error Handling#
The scheduler implements defensive error handling to prevent individual schedule failures from affecting other schedules. Exception handling wraps the workflow enqueueing logic:
try:
workflow_id = f"sched-{self.schedule_name}-{next_exec_time.isoformat()}"
if not dbos._sys_db.get_workflow_status(workflow_id):
with SetWorkflowID(workflow_id):
scheduler_queue.enqueue(func, next_exec_time, self.context)
except Exception:
dbos_logger.warning(
f"Exception in schedule '{self.schedule_name}': "
f"{traceback.format_exc()}"
)
Exceptions are logged as warnings but do not terminate the scheduler thread, allowing continued execution for subsequent scheduled times. Invalid cron expressions are caught at startup, schedules with unregistered workflow names are skipped with warnings rather than causing failures, and schedules with undeclared queue names raise a DBOSException during creation or application.
Automatic Failover#
The architecture provides inherent fault tolerance through its distributed design. Since any instance can manage schedules and there is no single point of failure, when an instance crashes:
- Other instances continue polling the database and executing schedules
- The duplicate prevention mechanism ensures crashed instance's in-progress workflows aren't re-executed
- DBOS Conductor detects failures through closed websocket connections and coordinates workflow recovery
- Alternatively, manual recovery mechanisms can restart interrupted workflows
This design ensures continuous schedule execution even during instance failures, with the database providing consistency guarantees across the remaining healthy instances.
Transaction Isolation#
Database transaction isolation is critical for preventing race conditions during workflow enqueueing. PostgreSQL properly configures transaction isolation levels to prevent two instances from simultaneously enqueueing the same workflow. However, SQLite had an isolation bug that allowed duplicate execution in multi-worker setups, which was fixed in PR #564.
Implementation Details#
Workflow Enqueueing Process#
When a scheduler determines a workflow should execute, it follows this process:
- Construct the deterministic workflow ID from schedule name and execution time
- Query the system database for existing workflow status
- If no existing workflow found, call the internal
_enqueue_scheduled_workflow()helper function - Query the
application_versionstable to get the latest application version - Determine the target queue: use the schedule's
queue_nameif specified, otherwise use the internal queue (_dbos_sys_internal) - Enqueue the workflow with the latest application version and target queue assigned to it
- Pass the scheduled execution time as the first parameter
- For dynamic schedules, pass the deserialized context object as the second parameter
Scheduled workflows are automatically enqueued with the latest application version. Introduced in PR #598, the system queries the application_versions table to determine which version is latest (based on the highest version_timestamp), and assigns that version to the workflow. This ensures scheduled workflows always run on the most recent version of the application code. The version tracking system records all deployed application versions in the database, enabling version management through the Application Version API.
Scheduled workflows can be enqueued to either the internal queue or to a specific declared queue. The target queue is determined by the schedule's queue_name parameter. If queue_name is specified, the workflow is enqueued to that queue; otherwise, it defaults to the internal queue. This enables scheduled workflows to participate in queue-based concurrency management and resource allocation strategies.
Cron Expression Parsing#
Both scheduler types use the croniter library with second_at_beginning=True configuration for parsing cron expressions. This enables second-level precision in schedule definitions.
Timezone Support#
Schedules support configurable timezones through the optional cron_timezone parameter, which accepts IANA timezone names (e.g., "America/New_York", "Europe/London", "Asia/Tokyo"). When specified, the cron expression is evaluated in the configured timezone rather than UTC. This allows schedules to align with local business hours or regulatory requirements regardless of where application instances are deployed.
For example, a schedule with cron="0 9 * * 1-5" and cron_timezone="America/New_York" will fire at 9:00 AM Eastern Time on weekdays, automatically accounting for daylight saving time transitions. The scheduled execution time passed to the workflow function will be in the configured timezone.
When cron_timezone is not specified, schedules default to UTC. This is recommended for schedules that should fire at consistent absolute times regardless of geographic location or daylight saving time, such as system maintenance tasks or global data synchronization.
In distributed environments spanning multiple geographic regions, timezone configuration ensures all instances coordinate execution based on the same timezone interpretation. The database stores the timezone name, and each scheduler instance evaluates the cron expression in that timezone when calculating next execution times.
Execution Time Calculation#
The scheduler follows this process:
- Initializes a
croniterinstance with the current time in the schedule's timezone (UTC if nocron_timezoneis configured) - Calls
get_next(datetime)to calculate the next execution time - Computes sleep duration as the difference between next execution time and current time
- Applies jitter to the sleep duration
- Waits using a stop event with timeout, allowing clean shutdown
After successful workflow execution, the scheduler updates the last_fired_at timestamp in the database to the execution time, enabling automatic backfill coordination and schedule monitoring.
Polling Configuration#
The dynamic scheduler's polling loop accepts a configurable polling_interval_sec parameter that controls how frequently the database is queried for schedule changes. This parameter balances responsiveness (how quickly schedule changes take effect) against database load:
- Shorter intervals (e.g., 5-10 seconds): Near-immediate schedule change propagation, higher database query rate
- Longer intervals (e.g., 60-300 seconds): Reduced database load, delayed schedule change propagation
In practice, the polling interval should be chosen based on schedule management patterns. If schedules change infrequently, longer intervals reduce unnecessary database queries. If dynamic schedule management is common, shorter intervals improve responsiveness.
Evolution and History#
PR #484: "Better Scheduler" (October 2025)#
PR #484 represented a major milestone in distributed scheduler reliability. The pull request introduced two critical features:
- Jitter-based load distribution: Applied random delays capped at 10% of sleep time or 10 seconds to prevent thundering herd problems
- Duplicate prevention: Added database status checks before workflow enqueueing to prevent multiple instances from executing the same scheduled workflow
These changes specifically targeted distributed deployment scenarios, transforming the scheduler from a single-instance design to a multi-instance capable system.
PR #581: "Workflow Schedules" (February 2026)#
PR #581 introduced the decorator-based scheduling framework and database persistence for schedules:
- Created
_scheduler_decorator.pyfor the@scheduleddecorator implementation - Added database schema support for storing schedules in the
workflow_schedulestable - Implemented schedule management operations (create, pause, resume, delete, list)
- Enabled schedule persistence across application restarts
This pull request established the foundation for dynamic schedule management while maintaining backward compatibility through the decorator-based approach.
PR #589: Class-Based Workflow Support#
PR #589 extended scheduler functionality to support class-based workflows, adding an optional workflow_class_name parameter to schedule definitions and corresponding database schema changes. This enabled object-oriented workflow organization while maintaining the same coordination mechanisms.
PR #598: Application Version Tracking (February 2026)#
PR #598 introduced comprehensive application version management:
- Created the
application_versionstable to record all deployed versions with timestamps - Implemented version management APIs:
list_application_versions(),get_latest_application_version(), andset_latest_application_version() - Modified scheduled workflow enqueueing to automatically assign the latest application version
- Added startup warnings when an executor is not running the latest version
- Improved class-based workflow scheduling to correctly distinguish between static methods and class methods
This pull request ensured scheduled workflows always execute on the intended application version, enabling sophisticated deployment strategies and version coordination across distributed instances.
PR #611: Enhanced Scheduler Features (March 2026)#
PR #611 introduced several improvements for production scheduler reliability:
- Last Fired Tracking: Added
last_fired_attimestamp field to track when each schedule last executed successfully, enabling better monitoring and coordination - Automatic Backfill: Introduced
automatic_backfillparameter that triggers automatic catch-up of missed executions when the scheduler starts, usinglast_fired_atto determine the backfill window - Timezone Support: Added
cron_timezoneparameter to evaluate cron expressions in specific IANA timezones rather than always using UTC, enabling schedules aligned with local business hours
These enhancements improved scheduler reliability in distributed environments by providing automatic recovery from downtime and flexible timezone handling for global deployments.
PR #625: Queue-Based Scheduling (March 2026)#
PR #625 introduced the ability to route scheduled workflows to declared queues:
- Queue Assignment: Added optional
queue_nameparameter to schedule definitions, enabling scheduled workflows to be enqueued to specific queues for concurrency management - Queue Validation: Implemented validation that raises
DBOSExceptionif a schedule specifies an undeclared queue name - Database Schema: Extended the
workflow_schedulestable with aqueue_namecolumn to persist queue routing configuration - Backfill and Trigger Support: Ensured backfill operations and manual schedule triggers respect the configured queue assignment
This feature enables scheduled workflows to participate in queue-based resource allocation and concurrency control, allowing fine-grained management of scheduled task execution across distributed instances.
Relevant Code Files#
| File | Purpose | Key Features |
|---|---|---|
| dbos/_scheduler.py | Dynamic scheduler implementation | Jitter application, duplicate prevention, backfill operations, manual triggering, polling loop for schedule changes, thread management, timezone support, automatic backfill, last_fired_at tracking, queue assignment |
| dbos/_scheduler_decorator.py | Static decorator-based scheduler | @scheduled decorator, compile-time schedule registration, cron expression validation, jitter application |
Related Topics#
- Workflow Queues and Deduplication: DBOS uses queue-based workflow execution with built-in deduplication for concurrent requests with the same workflow ID
- DBOS System Database Architecture: The PostgreSQL-based coordination layer that enables distributed scheduler operation
- Workflow Recovery and Fault Tolerance: Mechanisms for detecting and recovering interrupted workflows in distributed deployments
- Distributed Worker Coordination: Patterns for coordinating multiple worker instances using database-backed state
- Database Transaction Isolation Levels: PostgreSQL isolation guarantees that prevent race conditions in distributed coordination scenarios