Overview#
ProcessEventSource is a component in the procmond module that implements the EventSource trait for the collector-core framework. It is responsible for collecting process information from the system and streaming it as events to the collector runtime. This component wraps the existing process collection logic (formerly handled by ProcessMessageHandler and IPC servers) and integrates it directly into the unified event source model of collector-core, eliminating the need for separate IPC communication for process data in procmond procmond/src/event_source.rs.
Architecture#
Role within collector-core#
ProcessEventSource acts as a bridge between the process enumeration logic and the collector-core event streaming system. It implements the EventSource trait, providing methods for capability negotiation, event streaming, health checks, and lifecycle management. When registered with a Collector, it periodically enumerates system processes, converts them into ProcessEvent objects, and sends them as CollectionEvents through an asynchronous channel. The collector runtime manages event batching, backpressure, telemetry, and IPC, providing a unified interface for all event sources collector-core/src/collector.rs.
Configuration and Management#
Configuration Parameters#
Process collection is configured via the ProcessSourceConfig struct, which includes:
collection_interval(Duration): Interval between process collection cycles. Default is 30 seconds.collect_enhanced_metadata(bool): Whether to collect enhanced metadata (e.g., CPU/memory usage, start time). Default istrue.max_processes_per_cycle(usize): Maximum number of processes to collect per cycle.0means unlimited. Default is0.compute_executable_hashes(bool): Whether to compute executable hashes. Default isfalse.
Example:
use procmond::event_source::{ProcessEventSource, ProcessSourceConfig};
use daemoneye_lib::storage::DatabaseManager;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
let db_manager = Arc::new(Mutex::new(DatabaseManager::new("/var/lib/daemoneye/processes.db")?));
let config = ProcessSourceConfig {
collection_interval: Duration::from_secs(60),
collect_enhanced_metadata: false,
max_processes_per_cycle: 1000,
compute_executable_hashes: true,
};
let source = ProcessEventSource::with_config(db_manager, config);
CLI Options#
When running procmond, process collection parameters can be set via CLI options:
--interval <seconds>: Sets the collection interval (default: 30, min: 5, max: 3600).--max_processes <number>: Sets the maximum number of processes to collect per cycle (default: 0, unlimited).--enhanced_metadata: Enables enhanced metadata collection.--compute_hashes: Enables executable hashing.
Example CLI usage:
procmond --interval 60 --max_processes 500 --enhanced_metadata --compute_hashes
These options are parsed and mapped to the ProcessSourceConfig struct during initialization procmond/src/main.rs.
Lifecycle and Event Streaming#
ProcessEventSource is registered with the Collector and started as part of the collector runtime. It runs a loop that collects processes at the configured interval, sending events via a channel, and listens for a shutdown signal to stop gracefully. The collector runtime handles event batching, IPC, telemetry, and health checks for all registered sources collector-core/src/collector.rs.
Migration from IPC Server Setups#
Previously, procmond used a dedicated IPC server and ProcessMessageHandler to communicate process data to other components via protobuf messages. With the introduction of ProcessEventSource and the collector-core framework, process collection is now integrated directly as an event source. This architectural change removes the need for a separate IPC server for process data, centralizes configuration and lifecycle management, and enables unified event streaming, capability negotiation, and telemetry across all sources procmond/src/event_source.rs procmond/src/lib.rs.
Example: Registering ProcessEventSource#
use procmond::event_source::{ProcessEventSource, ProcessSourceConfig};
use daemoneye_lib::storage::DatabaseManager;
use collector_core::{Collector, CollectorConfig};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let db_manager = Arc::new(Mutex::new(DatabaseManager::new("/var/lib/daemoneye/processes.db")?));
let process_config = ProcessSourceConfig {
collection_interval: Duration::from_secs(30),
collect_enhanced_metadata: true,
max_processes_per_cycle: 0,
compute_executable_hashes: false,
};
let process_source = ProcessEventSource::with_config(db_manager, process_config);
let collector_config = CollectorConfig::default().with_component_name("procmond".to_string());
let mut collector = Collector::new(collector_config);
collector.register(Box::new(process_source))?;
collector.run().await?;
Ok(())
}
Summary Table: Key Parameters#
| Parameter | CLI Option | Config Field | Default | Description |
|---|---|---|---|---|
| Collection Interval | --interval | collection_interval | 30 seconds | Time between process scans |
| Max Processes per Cycle | --max_processes | max_processes_per_cycle | 0 (unlimited) | Maximum processes to collect per scan |
| Enhanced Metadata | --enhanced_metadata | collect_enhanced_metadata | true | Collect CPU/memory, start time, etc. |
| Executable Hashing | --compute_hashes | compute_executable_hashes | false | Compute hashes of process executables |
For further details, see the ProcessEventSource implementation and procmond CLI setup.