Documents
collector-core Capabilities and Lifecycle
collector-core Capabilities and Lifecycle
Type
Document
Status
Published
Created
Dec 3, 2025
Updated
Dec 3, 2025
Updated by
Dosu Bot

Capability Management System#

The collector-core framework uses a capability management system to enable dynamic feature negotiation and efficient task routing between collectors and event sources. Capabilities are represented as bitflags in the SourceCaps struct, allowing each event source to declare its supported monitoring domains and operational features. The available capability flags include:

  • PROCESS: Process monitoring
  • NETWORK: Network monitoring
  • FILESYSTEM: Filesystem monitoring
  • PERFORMANCE: Performance monitoring
  • REALTIME: Real-time event streaming
  • KERNEL_LEVEL: Kernel-level access
  • SYSTEM_WIDE: System-wide monitoring

These flags can be combined using bitwise operations to represent multiple capabilities. For example:

use collector_core::SourceCaps;

// Process monitoring with real-time and system-wide capabilities
let process_caps = SourceCaps::PROCESS | SourceCaps::REALTIME | SourceCaps::SYSTEM_WIDE;

// Network monitoring with kernel-level access
let network_caps = SourceCaps::NETWORK | SourceCaps::KERNEL_LEVEL | SourceCaps::REALTIME;

// Check if a capability is present
assert!(process_caps.contains(SourceCaps::PROCESS));
assert!(!process_caps.contains(SourceCaps::NETWORK));

Source

EventSource Trait and Lifecycle#

The core abstraction for event sources is the EventSource trait. This trait standardizes the interface and lifecycle management for all event sources in the framework. Implementations must provide:

  • name(): Returns a unique, stable name for the event source.
  • capabilities(): Returns the static set of capabilities supported by the source.
  • start(): Begins event collection, sending events through a provided channel until a shutdown signal is received.
  • stop(): Signals the source to stop collecting events and performs cleanup.
  • health_check(): Optionally provides health status for the source (default is always healthy).

The lifecycle of an event source follows these phases:

  1. Registration: The event source is registered with a Collector.
  2. Capability Negotiation: The collector queries the source's capabilities.
  3. Startup: The collector calls start(), passing an event channel and shutdown signal.
  4. Operation: The source emits events through the channel.
  5. Shutdown: The collector calls stop() for graceful cleanup.

All event sources must be Send + Sync to support concurrent operation. The start() method should use tokio::select! to monitor both collection tasks and the shutdown signal, and should not block indefinitely. Errors should be logged and handled gracefully, allowing continued operation when possible. The stop() method should complete quickly and not block, returning an error only if cleanup fails but still considering the source stopped.

Example implementation:

use collector_core::{EventSource, SourceCaps, CollectionEvent};
use async_trait::async_trait;
use std::sync::{Arc, atomic::AtomicBool};
use tokio::sync::mpsc;

struct MyEventSource;

#[async_trait]
impl EventSource for MyEventSource {
    fn name(&self) -> &'static str { "my-source" }
    fn capabilities(&self) -> SourceCaps { SourceCaps::PROCESS | SourceCaps::REALTIME }
    async fn start(&self, tx: mpsc::Sender<CollectionEvent>, shutdown_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
        // Event collection logic here
        Ok(())
    }
    async fn stop(&self) -> anyhow::Result<()> { Ok(()) }
}

Source

Collector and CollectorRuntime Lifecycle#

The Collector struct manages event sources, event aggregation, and shared operational infrastructure. Event sources are registered before running the collector, with limits enforced on the maximum number of sources and prevention of duplicate names. The collector aggregates the capabilities of all registered sources to provide a unified view.

The CollectorRuntime struct manages the operational state, including event processing, health monitoring, graceful shutdown, telemetry collection, backpressure handling, and capability tracking. Event sources are started asynchronously with a shutdown signal and startup timeout, and their handles are tracked for graceful shutdown.

Event processing is performed in batches, with backpressure managed via a semaphore. The batch size and timeout are configurable. Health monitoring and telemetry collection run as asynchronous tasks, periodically checking health status and resource usage. The runtime supports graceful shutdown by listening for OS signals and coordinating the shutdown of all components, including event sources and the IPC server.

The IPC server communicates with daemoneye-agent and maintains aggregated capabilities, updating them dynamically as event sources change.

Source

Configuration Options#

Configuration for the collector runtime is managed by the CollectorConfig struct. Key options include:

  • max_event_sources: Maximum number of event sources (default: 16)
  • event_buffer_size: Buffer size for the event channel (default: 1000)
  • shutdown_timeout: Timeout for graceful shutdown (default: 30s)
  • health_check_interval: Interval for health checks (default: 60s)
  • startup_timeout: Maximum time to wait for event source startup (default: 10s)
  • enable_debug_logging: Enable detailed logging (default: false)
  • max_batch_size: Maximum events per batch (default: 100)
  • batch_timeout: Timeout for event batching (default: 100ms)
  • backpressure_threshold: Threshold for event channel backpressure (default: 800)
  • max_backpressure_wait: Maximum wait time for backpressure (default: 500ms)
  • enable_telemetry: Enable telemetry collection (default: true)
  • telemetry_interval: Telemetry collection interval (default: 30s)
  • component_name: Name for configuration loading (default: "collector-core")

Configuration can be customized using builder-style methods:

use collector_core::CollectorConfig;
use std::time::Duration;

let config = CollectorConfig::new()
    .with_max_event_sources(32)
    .with_event_buffer_size(2000)
    .with_shutdown_timeout(Duration::from_secs(60))
    .with_health_check_interval(Duration::from_secs(120))
    .with_debug_logging(true);

Environment variable overrides are supported using a prefix based on the component name, allowing runtime customization of key options. Configuration can also be loaded from hierarchical sources via daemoneye-lib's ConfigLoader, with validation to ensure operational safety.
Source

Extending collector-core with New Event Sources#

To add a new event source, implement the EventSource trait for your source and register it with a Collector instance. For example:

use collector_core::{Collector, CollectorConfig, EventSource, CollectionEvent, SourceCaps};
use async_trait::async_trait;
use tokio::sync::mpsc;
use std::sync::{Arc, atomic::AtomicBool};

struct MyEventSource;

#[async_trait]
impl EventSource for MyEventSource {
    fn name(&self) -> &'static str { "my-source" }
    fn capabilities(&self) -> SourceCaps { SourceCaps::PROCESS | SourceCaps::REALTIME }
    async fn start(&self, tx: mpsc::Sender<CollectionEvent>, shutdown_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
        // Event collection logic here
        Ok(())
    }
    async fn stop(&self) -> anyhow::Result<()> { Ok(()) }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let config = CollectorConfig::default();
    let mut collector = Collector::new(config);

    collector.register(Box::new(MyEventSource))?;
    collector.run().await
}

Source

Event Model#

The framework supports multiple event domains using the unified CollectionEvent enum. Each event type, such as ProcessEvent or NetworkEvent, includes a timestamp and domain-specific fields. All events are serializable via serde for storage and transmission.

use collector_core::{CollectionEvent, ProcessEvent, NetworkEvent};
use std::time::SystemTime;

let process_event = CollectionEvent::Process(ProcessEvent {
    pid: 1234,
    name: "example".to_string(),
    timestamp: SystemTime::now(),
    // ... other fields
});

Source

Testing and Performance Considerations#

collector-core includes a comprehensive test suite covering unit, integration, performance, security, property-based, chaos, and compatibility tests. Tests verify capability flag correctness, event source registration, configuration validation, event batching, telemetry integration, and graceful shutdown.

Performance characteristics include event throughput greater than 1000 events/second, runtime overhead under 5% CPU usage, bounded memory usage under load, and graceful shutdown within 500ms.

Run the test suite with:

cargo test -p collector-core

Run integration tests:

cargo test -p collector-core --test integration_test

Run performance benchmarks:

cargo bench -p collector-core

Source