Asynchronous Programming Guidelines for DaemonEye#
This document outlines standards and conventions for asynchronous Rust programming in the DaemonEye project. It covers async/await usage, task management, error handling, resource cleanup, concurrency patterns, and testing strategies using Tokio, with project-specific examples and enforceable best practices.
Async/Await Usage#
DaemonEye uses Rust's async/await syntax exclusively for asynchronous operations, leveraging Tokio as the async runtime. All asynchronous interfaces, such as the EventSource trait, require async methods and are implemented using the async_trait crate to support async trait functions. All event sources and runtime components must be Send + Sync to ensure safe concurrent execution within the Tokio runtime.
Example: Async Trait Implementation
#[async_trait]
pub trait EventSource: Send + Sync {
async fn start(
&self,
tx: mpsc::Sender<CollectionEvent>,
shutdown_signal: Arc<AtomicBool>,
) -> anyhow::Result<()>;
async fn stop(&self) -> anyhow::Result<()>;
}
Task Management#
Tasks are managed using tokio::spawn for concurrent execution. Handles to spawned tasks are stored (e.g., in hash maps) for lifecycle management and coordinated shutdown. Use atomic flags (Arc<AtomicBool>) to signal shutdown, and enforce timeouts for startup and shutdown to prevent indefinite blocking. Each event source, health check, and event processor runs in its own Tokio task.
Example: Spawning and Managing Tasks
let handle = tokio::spawn(async move {
let result = timeout(startup_timeout, source.start(event_tx, shutdown_signal)).await;
// handle result...
});
self.source_handles.insert(source_name, handle);
Error Handling#
Use anyhow::Result for error propagation in all async functions. Log errors using the tracing crate. Critical errors (e.g., startup failures) should stop the affected source and be logged as errors; non-critical errors (e.g., transient collection failures) should be logged and allow continued operation. Prefer early returns with anyhow::bail! for unrecoverable errors.
Example: Error Handling in Async Context
match result {
Ok(Ok(())) => info!("Event source started successfully"),
Ok(Err(e)) => {
error!(error = %e, "Event source failed to start");
Err(e)
}
Err(_) => {
error!("Event source startup timed out");
Err(anyhow::anyhow!("Source startup timeout"))
}
}
Resource Cleanup and Graceful Shutdown#
Signal shutdown using an atomic flag (Arc<AtomicBool>) or a Tokio oneshot channel. Await all spawned tasks with a timeout to ensure they complete or are forcefully terminated. Clean up resources such as socket files, connection pools, or temporary files as part of shutdown. Implement the stop method in event sources for quick, non-blocking cleanup.
Example: Coordinated Shutdown
// Signal shutdown
shutdown_signal.store(true, Ordering::Relaxed);
// Wait for all tasks to complete with timeout
let shutdown_timeout = self.config.shutdown_timeout;
match timeout(shutdown_timeout, try_join_all(all_handles)).await {
Ok(Ok(_)) => info!("All components shut down cleanly"),
Ok(Err(e)) => error!(error = %e, "Component shutdown error"),
Err(_) => warn!("Shutdown timeout exceeded, forcing exit"),
}
Concurrency Patterns#
DaemonEye uses Tokio's mpsc channels for async communication, RwLock and Mutex for shared mutable state, and Semaphore for backpressure and connection limiting. Use atomic types for counters and shutdown signaling. The tokio::select! macro is used to concurrently monitor multiple async events, such as event collection, shutdown signals, and timeouts. Circuit breaker and load balancing patterns are used in IPC and connection management.
Example: Concurrent Event Loop with select!
loop {
tokio::select! {
_ = collection_interval.tick() => {
// Periodic collection
}
_ = async {
while !shutdown_signal.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(100)).await;
}
} => {
// Shutdown detected
break;
}
}
}
Backpressure Example:
let permit = match timeout(max_backpressure_wait, backpressure_semaphore.acquire()).await {
Ok(Ok(permit)) => permit,
Ok(Err(_)) => { /* handle error */ }
Err(_) => { /* handle timeout */ }
};
// Release permit after processing
drop(permit);
Testing Strategies with Tokio#
Use #[tokio::test] for all async tests. Mock event sources by implementing the EventSource trait. Use temporary resources (e.g., files, sockets) and assert on async function results. Test edge cases and failure modes using controlled delays, timeouts, and simulated errors. Chaos testing is performed by introducing random failures, backpressure, and resource contention to ensure system resilience.
Example: Async Test with Mock Source
#[tokio::test]
async fn test_source_registration() {
let mut collector = Collector::new(CollectorConfig::default());
let source = TestEventSource::new("test", SourceCaps::PROCESS);
collector.register(Box::new(source)).expect("Failed to register source");
assert_eq!(collector.source_count(), 1);
}
Chaos Testing Example:
#[tokio::test]
async fn test_random_event_failure_recovery() {
let chaos_source = ChaosEventSource::new("chaos", SourceCaps::PROCESS, FailureMode::RandomEventFailure)
.with_events(100)
.with_failure_probability(0.3);
// Register and run collector, assert resilience to failures
}
Project-Specific Conventions#
- Register all event sources before running the collector. Enforce unique source names and maximum source limits.
- Use structured logging with the
tracingcrate for all async operations. - Document all async trait methods with lifecycle, error handling, and shutdown expectations.
- Prefer non-blocking, timeout-bounded operations in all async code.
- Use batching and backpressure controls to avoid deadlocks and resource exhaustion.
- Test all async code paths, including edge cases and failure scenarios, using Tokio's async test framework and chaos/failure simulation patterns.
By following these guidelines, DaemonEye ensures robust, maintainable, and resilient asynchronous Rust code throughout the project.