Documents
collector-core Framework
collector-core Framework
Type
Document
Status
Published
Created
Oct 31, 2025
Updated
Oct 31, 2025
Updated by
Dosu Bot

Architecture and Purpose#

The collector-core framework is the central, extensible infrastructure for unified event collection and monitoring in the DaemonEye ecosystem. It provides a reusable runtime and shared operational foundation for integrating multiple monitoring domains—such as process, network, filesystem, and performance—while abstracting away common concerns like event handling, batching, backpressure, telemetry, health monitoring, configuration, and inter-process communication (IPC) [collector-core/lib.rs].

The framework is designed for extensibility and modularity, enabling both open-source and enterprise deployments to scale from single-source monitoring (e.g., process monitoring in procmond) to complex, multi-domain collection scenarios.

Core Components#

Collector Runtime#

The Collector struct manages the lifecycle of multiple event sources, aggregates their events, and provides shared infrastructure for health monitoring, telemetry, and graceful shutdown. It enforces constraints such as the maximum number of event sources and prevents duplicate registrations [collector-core/collector.rs].

EventSource Trait#

All collection components must implement the EventSource trait, which abstracts the collection methodology from the operational infrastructure. This trait requires implementations to be Send + Sync for concurrency and defines the following methods:

pub trait EventSource: Send + Sync {
    fn name(&self) -> &'static str;
    fn capabilities(&self) -> SourceCaps;
    async fn start(
        &self,
        tx: mpsc::Sender<CollectionEvent>,
        shutdown_signal: Arc<AtomicBool>,
    ) -> anyhow::Result<()>;
    async fn stop(&self) -> anyhow::Result<()>;
    async fn health_check(&self) -> anyhow::Result<()> { Ok(()) }
}

The lifecycle includes registration, capability negotiation, async startup, event generation, and graceful shutdown [collector-core/source.rs].

CollectionEvent Model#

Events from all sources are represented by the unified, extensible CollectionEvent enum. Each variant is strongly typed and serializable, supporting process, network, filesystem, and performance domains. This design enables type-safe, timestamped, and extensible event handling [collector-core/event.rs].

Example:

let event = CollectionEvent::Process(ProcessEvent {
    pid: 1234,
    name: "example".to_string(),
    // ... other fields ...
    timestamp: SystemTime::now(),
});

IPC Server#

The CollectorIpcServer manages IPC communication with external components (e.g., daemoneye-agent), using protobuf protocol and CRC32 framing. It exposes the collector's capabilities, handles detection tasks, and supports dynamic capability updates as event sources are registered or unregistered [collector-core/ipc.rs].

Event Source Lifecycle#

  1. Registration: Event sources are registered with the Collector before runtime.
  2. Capability Negotiation: The collector queries each source's capabilities for unified feature discovery and task routing.
  3. Startup: The collector asynchronously starts each source, passing an event channel and shutdown signal.
  4. Operation: Sources generate and send events through the channel, supporting batching and backpressure.
  5. Health Monitoring: Periodic health checks ensure sources remain operational.
  6. Shutdown: The collector coordinates graceful shutdown on signal, enforcing timeouts and cleanup [collector-core/collector.rs].

Trait Implementations#

The EventSource trait is the primary extensibility point. Implementations must provide a unique name, declare supported capabilities, and implement async start, stop, and optional health_check methods. All implementations must be thread-safe (Send + Sync) to support concurrent operation.

Example implementation:

#[async_trait]
impl EventSource for MySource {
    fn name(&self) -> &'static str { "my-source" }
    fn capabilities(&self) -> SourceCaps { SourceCaps::PROCESS }
    async fn start(&self, tx: mpsc::Sender<CollectionEvent>, shutdown_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
        // Collection logic
        Ok(())
    }
    async fn stop(&self) -> anyhow::Result<()> { Ok(()) }
}

[collector-core/source.rs]

Capability Management#

Capabilities are managed using the SourceCaps bitflags, allowing each event source to declare its supported features, such as PROCESS, NETWORK, FILESYSTEM, PERFORMANCE, REALTIME, KERNEL_LEVEL, and SYSTEM_WIDE. The collector aggregates these capabilities to provide a unified view, which is exposed via IPC for negotiation and task validation [collector-core/source.rs; collector-core/ipc.rs].

Example:

let caps = SourceCaps::PROCESS | SourceCaps::REALTIME;
if caps.contains(SourceCaps::PROCESS) { /* ... */ }

Configuration Options#

The CollectorConfig struct manages runtime configuration, including:

  • max_event_sources: Maximum number of event sources (default: 16)
  • event_buffer_size: Buffer size for event channels (default: 1000)
  • shutdown_timeout: Graceful shutdown timeout (default: 30s)
  • health_check_interval: Health check interval (default: 60s)
  • startup_timeout: Event source startup timeout (default: 10s)
  • enable_debug_logging: Enable detailed logging
  • max_batch_size: Maximum events per batch (default: 100)
  • batch_timeout: Batch flush timeout (default: 100ms)
  • backpressure_threshold: Backpressure threshold (default: 80% of buffer size)
  • max_backpressure_wait: Max wait for backpressure (default: 500ms)
  • enable_telemetry: Enable telemetry collection (default: true)
  • telemetry_interval: Telemetry interval (default: 30s)
  • component_name: Name for configuration loading

CollectorConfig supports default values, validation, builder-style setters, and hierarchical loading from system/user files and environment variables via the daemoneye-lib ConfigLoader [collector-core/config.rs].

Example:

let config = CollectorConfig::default()
    .with_max_event_sources(4)
    .with_debug_logging(true);

Integration with procmond#

Integration with procmond is achieved by implementing the EventSource trait in ProcessEventSource, which wraps the existing process collection logic and database operations. ProcessEventSource is registered with the collector, enabling unified event collection and lifecycle management [procmond/event_source.rs].

Example:

let process_source = ProcessEventSource::with_config(db_manager, process_config);
let mut collector = Collector::new(collector_config);
collector.register(Box::new(process_source))?;
collector.run().await?;

ProcessEventSource demonstrates dynamic capability management (e.g., adding REALTIME if the collection interval is short), flexible configuration, and emits CollectionEvent variants for unified processing.

Multi-Domain Event Collection#

The framework supports multi-domain event collection by allowing concurrent registration and management of diverse event sources, each emitting their own domain-specific events. The unified CollectionEvent model and aggregated capabilities enable seamless handling and routing of events from process, network, filesystem, and performance domains [collector-core/event.rs].

Shared OSS and Enterprise Features#

Shared features are supported through modular configuration, dynamic capability management, and extensible design. For example, options like compute_executable_hashes are present as configuration flags (with enterprise features marked as TODO), allowing the same codebase to support both OSS and enterprise deployments without breaking compatibility. The framework's architecture enables feature extension and selective enablement based on configuration and capability negotiation [procmond/event_source.rs].


For further details, see the collector-core source and procmond integration.