Documents
collector-core Framework
collector-core Framework
Type
Document
Status
Published
Created
Oct 31, 2025
Updated
Apr 11, 2026
Updated by
Dosu Bot

Architecture and Purpose#

The collector-core framework is the central, extensible infrastructure for unified event collection and monitoring in the DaemonEye ecosystem. It provides a reusable runtime and shared operational foundation for integrating multiple monitoring domains—such as process, network, filesystem, and performance—while abstracting away common concerns like event handling, batching, backpressure, telemetry, health monitoring, configuration, and inter-process communication (IPC) [collector-core/lib.rs].

The framework is designed for extensibility and modularity, enabling both open-source and enterprise deployments to scale from single-source monitoring (e.g., process monitoring in procmond) to complex, multi-domain collection scenarios.

Core Components#

Collector Runtime#

The Collector struct manages the lifecycle of multiple event sources, aggregates their events, and provides shared infrastructure for health monitoring, telemetry, and graceful shutdown. It enforces constraints such as the maximum number of event sources and prevents duplicate registrations [collector-core/collector.rs].

EventSource Trait#

All collection components must implement the EventSource trait, which abstracts the collection methodology from the operational infrastructure. This trait requires implementations to be Send + Sync for concurrency and defines the following methods:

pub trait EventSource: Send + Sync {
    fn name(&self) -> &'static str;
    fn capabilities(&self) -> SourceCaps;
    async fn start(
        &self,
        tx: mpsc::Sender<CollectionEvent>,
        shutdown_signal: Arc<AtomicBool>,
    ) -> anyhow::Result<()>;
    async fn stop(&self) -> anyhow::Result<()>;
    async fn health_check(&self) -> anyhow::Result<()> { Ok(()) }
}

The lifecycle includes registration, capability negotiation, async startup, event generation, and graceful shutdown [collector-core/source.rs].

CollectionEvent Model#

Events from all sources are represented by the unified, extensible CollectionEvent enum. Each variant is strongly typed and serializable, supporting process, network, filesystem, and performance domains. This design enables type-safe, timestamped, and extensible event handling [collector-core/event.rs].

Example:

let event = CollectionEvent::Process(ProcessEvent {
    pid: 1234,
    name: "example".to_string(),
    // ... other fields ...
    timestamp: SystemTime::now(),
});

IPC Server#

The CollectorIpcServer manages IPC communication with external components (e.g., daemoneye-agent), using protobuf protocol and CRC32 framing. It exposes the collector's capabilities, handles detection tasks, and supports dynamic capability updates as event sources are registered or unregistered [collector-core/ipc.rs].

Event Source Lifecycle#

  1. Registration: Event sources are registered with the Collector before runtime.
  2. Capability Negotiation: The collector queries each source's capabilities for unified feature discovery and task routing.
  3. Startup: The collector asynchronously starts each source, passing an event channel and shutdown signal.
  4. Operation: Sources generate and send events through the channel, supporting batching and backpressure.
  5. Health Monitoring: Periodic health checks ensure sources remain operational.
  6. Shutdown: The collector coordinates graceful shutdown on signal, enforcing timeouts and cleanup [collector-core/collector.rs].

Trait Implementations#

The EventSource trait is the primary extensibility point. Implementations must provide a unique name, declare supported capabilities, and implement async start, stop, and optional health_check methods. All implementations must be thread-safe (Send + Sync) to support concurrent operation.

Example implementation:

#[async_trait]
impl EventSource for MySource {
    fn name(&self) -> &'static str { "my-source" }
    fn capabilities(&self) -> SourceCaps { SourceCaps::PROCESS }
    async fn start(&self, tx: mpsc::Sender<CollectionEvent>, shutdown_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
        // Collection logic
        Ok(())
    }
    async fn stop(&self) -> anyhow::Result<()> { Ok(()) }
}

[collector-core/source.rs]

Capability Management#

Capabilities are managed using the SourceCaps bitflags, allowing each event source to declare its supported features, such as PROCESS, NETWORK, FILESYSTEM, PERFORMANCE, REALTIME, KERNEL_LEVEL, and SYSTEM_WIDE. The collector aggregates these capabilities to provide a unified view, which is exposed via IPC for negotiation and task validation [collector-core/source.rs; collector-core/ipc.rs].

Example:

let caps = SourceCaps::PROCESS | SourceCaps::REALTIME;
if caps.contains(SourceCaps::PROCESS) { /* ... */ }

Configuration Options#

The CollectorConfig struct manages runtime configuration, including:

  • max_event_sources: Maximum number of event sources (default: 16)
  • event_buffer_size: Buffer size for event channels (default: 1000)
  • shutdown_timeout: Graceful shutdown timeout (default: 30s)
  • health_check_interval: Health check interval (default: 60s)
  • startup_timeout: Event source startup timeout (default: 10s)
  • enable_debug_logging: Enable detailed logging
  • max_batch_size: Maximum events per batch (default: 100)
  • batch_timeout: Batch flush timeout (default: 100ms)
  • backpressure_threshold: Backpressure threshold (default: 80% of buffer size)
  • max_backpressure_wait: Max wait for backpressure (default: 500ms)
  • enable_telemetry: Enable telemetry collection (default: true)
  • telemetry_interval: Telemetry interval (default: 30s)
  • component_name: Name for configuration loading

CollectorConfig supports default values, validation, builder-style setters, and hierarchical loading from system/user files and environment variables via the daemoneye-lib ConfigLoader [collector-core/config.rs].

Example:

let config = CollectorConfig::default()
    .with_max_event_sources(4)
    .with_debug_logging(true);

Integration with procmond#

Integration with procmond is achieved by implementing the EventSource trait in ProcessEventSource, which wraps the existing process collection logic and database operations. ProcessEventSource is registered with the collector, enabling unified event collection and lifecycle management [procmond/event_source.rs].

Example:

let process_source = ProcessEventSource::with_config(db_manager, process_config);
let mut collector = Collector::new(collector_config);
collector.register(Box::new(process_source))?;
collector.run().await?;

ProcessEventSource demonstrates dynamic capability management (e.g., adding REALTIME if the collection interval is short), flexible configuration, and emits CollectionEvent variants for unified processing.

Multi-Domain Event Collection#

The framework supports multi-domain event collection by allowing concurrent registration and management of diverse event sources, each emitting their own domain-specific events. The unified CollectionEvent model and aggregated capabilities enable seamless handling and routing of events from process, network, filesystem, and performance domains [collector-core/event.rs].

TriggerableCollector Trait#

The TriggerableCollector trait defines collectors that respond to external trigger events rather than producing events on their own schedule like EventSource. This is the reactive counterpart to the enumeration-path collectors and provides the extension point for on-demand analysis tasks [collector-core/src/triggerable.rs].

The trait requires:

  • name(): stable collector identifier matching the trigger request's target_collector field
  • supported_analysis_types(): list of analysis types the collector can handle
  • handle_trigger(): async method that consumes a TriggerRequest and produces an AnalysisResult
  • health_check(): optional health check (default returns Ok)

TriggerableCollector integrates with the collector lifecycle through the runtime's TriggerManager, which dispatches requests to the matching collector by name. Responses flow via oneshot::Sender rather than polluting the event bus.

BinaryHasherCollector#

BinaryHasherCollector is the first implementation of TriggerableCollector, providing cryptographic integrity verification for executables. It handles AnalysisType::BinaryHash requests and produces HashResult values containing cryptographically secure hashes [collector-core/src/binary_hasher.rs].

Key features:

  • Multi-algorithm hashing: uses MultiAlgorithmHasher for streaming SHA-256 and BLAKE3 hash computation (SHA-3 available via sha3-hashes feature flag)
  • cap-std confinement: TOCTOU-resistant file access via Dir handles opened before privilege drop; all request paths are resolved relative to authorized roots
  • Defense-in-depth authorization: mandatory non-empty allowed_roots with deny-on-empty validation, path-length cap (4096 bytes), parent-traversal rejection, symlink rejection (default), and file-size limits
  • Resource bounds: configurable per-file timeout (default 10s), maximum file size (default 512 MiB), and concurrent operation cap

The collector is configured via BinaryHasherConfig, which provides builder methods and platform-specific secure defaults (with_platform_defaults populates allow-lists for /usr/bin, /usr/local/bin, etc. on Unix; %SystemRoot%\System32 and %ProgramFiles% on Windows).

Shared OSS and Enterprise Features#

Shared features are supported through modular configuration, dynamic capability management, and extensible design. The compute_executable_hashes configuration option is fully implemented: when enabled, it activates the BinaryHasherCollector and a post-enumeration parallel hash pass (via hash_pass::populate_hashes) that deduplicates and hashes unique executables. Hash computation is bounded by a 60-second request-scoped deadline. The framework supports both SHA-256 and BLAKE3 algorithms by default, with SHA-3 available as an opt-in via the sha3-hashes feature flag. Legacy algorithms (SHA-1, MD5) are deliberately not supported per AGENTS.md cryptographic standards [procmond/event_source.rs, procmond/src/hash_pass.rs].


For further details, see the collector-core source and procmond integration.

collector-core Framework | Dosu