Documents
Core Monitoring
Core Monitoring
Type
External
Status
Published
Created
Mar 8, 2026
Updated
Apr 3, 2026
Updated by
Dosu Bot

Overview#

The Core Monitoring specification defines the fundamental process monitoring capabilities that form the foundation of DaemonEye. This includes process enumeration, executable integrity verification, SQL-based detection engine, and multi-channel alerting across the three-component architecture.

Process Collection Architecture#

Cross-Platform Process Enumeration#

DaemonEye uses a layered approach to process enumeration, providing a unified interface across different operating systems while allowing platform-specific optimizations.

Base Implementation (sysinfo crate)#

Primary Interface: The sysinfo crate provides cross-platform process enumeration with consistent data structures.

use sysinfo::{Pid, ProcessExt, System, SystemExt};

pub struct ProcessCollector {
    system: System,
    config: CollectorConfig,
    hash_computer: Sha256HashComputer,
}

impl ProcessCollector {
    pub async fn enumerate_processes(&self) -> Result<Vec<ProcessRecord>> {
        self.system.refresh_processes();
        let mut processes = Vec::new();
        let collection_time =
            i64::try_from(SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis())
                .map_err(|_| "Timestamp overflow: system time exceeds i64::MAX milliseconds")?;

        for (pid, process) in self.system.processes() {
            let process_record = ProcessRecord {
                id: Uuid::new_v4(),
                scan_id: self.get_current_scan_id(),
                collection_time,
                pid: pid.as_u32(),
                ppid: process.parent().map(|p| p.as_u32()),
                name: process.name().to_string(),
                executable_path: process.exe().map(|p| p.to_path_buf()),
                command_line: process.cmd().to_vec(),
                start_time: process.start_time(),
                cpu_usage: process.cpu_usage(),
                memory_usage: Some(process.memory()),
                executable_hash: self.compute_executable_hash(process.exe()).await?,
                hash_algorithm: Some("sha256".to_string()),
                user_id: self.get_process_user(pid).await?,
                accessible: true,
                file_exists: process.exe().map(|p| p.exists()).unwrap_or(false),
                platform_data: self.get_platform_specific_data(pid).await?,
            };
            processes.push(process_record);
        }
        Ok(processes)
    }
}

Platform-Specific Enhancements#

Linux eBPF Integration (Enterprise Tier):

#[cfg(target_os = "linux")]
pub struct EbpfProcessCollector {
    base_collector: ProcessCollector,
    ebpf_monitor: Option<EbpfMonitor>,
}

impl EbpfProcessCollector {
    pub async fn enumerate_processes(&self) -> Result<Vec<ProcessRecord>> {
        if let Some(ebpf) = &self.ebpf_monitor {
            return self.enumerate_with_ebpf(ebpf).await;
        }
        self.base_collector.enumerate_processes().await
    }
}

Windows ETW Integration (Enterprise Tier):

#[cfg(target_os = "windows")]
pub struct EtwProcessCollector {
    base_collector: ProcessCollector,
    etw_monitor: Option<EtwMonitor>,
}

impl EtwProcessCollector {
    pub async fn enumerate_processes(&self) -> Result<Vec<ProcessRecord>> {
        if let Some(etw) = &self.etw_monitor {
            return self.enumerate_with_etw(etw).await;
        }
        self.base_collector.enumerate_processes().await
    }
}

macOS EndpointSecurity Integration (Enterprise Tier):

#[cfg(target_os = "macos")]
pub struct EndpointSecurityProcessCollector {
    base_collector: ProcessCollector,
    es_monitor: Option<EndpointSecurityMonitor>,
}

impl EndpointSecurityProcessCollector {
    pub async fn enumerate_processes(&self) -> Result<Vec<ProcessRecord>> {
        if let Some(es) = &self.es_monitor {
            return self.enumerate_with_endpoint_security(es).await;
        }
        self.base_collector.enumerate_processes().await
    }
}

Executable Integrity Verification#

Hash Computation: SHA-256 hashing of executable files for integrity verification.

use sha2::{Digest, Sha256};
use std::io::Read;
use std::path::Path;

pub trait HashComputer: Send + Sync {
    fn compute_hash(&self, path: &Path) -> Result<Option<String>>;
    fn get_algorithm(&self) -> &'static str;
    fn buffer_size(&self) -> usize;
    fn set_buffer_size(&mut self, size: usize);
}

#[derive(Clone)]
pub struct Sha256HashComputer {
    buffer_size: usize,
}

impl Sha256HashComputer {
    pub fn new(buffer_size: usize) -> Self {
        Self { buffer_size }
    }
}

impl HashComputer for Sha256HashComputer {
    fn compute_hash(&self, path: &Path) -> Result<Option<String>> {
        if !path.exists() {
            return Ok(None);
        }
        let mut file = std::fs::File::open(path)?;
        let mut hasher = Sha256::new();
        let mut buffer = vec![0u8; self.buffer_size];
        loop {
            let bytes_read = file.read(&mut buffer)?;
            if bytes_read == 0 { break; }
            hasher.update(&buffer[..bytes_read]);
        }
        let hash = hasher.finalize();
        Ok(Some(format!("{:x}", hash)))
    }

    fn get_algorithm(&self) -> &'static str { "sha256" }
    fn buffer_size(&self) -> usize { self.buffer_size }
    fn set_buffer_size(&mut self, size: usize) { self.buffer_size = size; }
}

Performance Optimization: Asynchronous hash computation with configurable buffer sizes.

impl ProcessCollector {
    async fn compute_executable_hash(&self, path: Option<&Path>) -> Result<Option<String>> {
        let path = match path {
            Some(p) => p,
            None => return Ok(None),
        };
        if self.should_skip_hashing(path) {
            return Ok(None);
        }
        let hash_computer = self.hash_computer.clone();
        let path = path.to_path_buf();
        tokio::task::spawn_blocking(move || hash_computer.compute_hash(&path)).await?
    }

    fn should_skip_hashing(&self, path: &Path) -> bool {
        let path_str = path.to_string_lossy();
        path_str.contains("/proc/")
            || path_str.contains("/sys/")
            || path_str.contains("/tmp/")
            || path_str.contains("\\System32\\")
    }
}

SQL-Based Detection Engine#

SQL Validation and Security#

AST Validation: Comprehensive SQL parsing and validation to prevent injection attacks.

use sqlparser::{ast::*, dialect::SQLiteDialect, parser::Parser};

pub struct SqlValidator {
    parser: Parser<SQLiteDialect>,
    allowed_functions: HashSet<String>,
    allowed_operators: HashSet<String>,
}

impl SqlValidator {
    pub fn validate_query(&self, sql: &str) -> Result<ValidationResult> {
        let ast = self.parser.parse_sql(sql)?;
        for statement in &ast {
            match statement {
                Statement::Query(query) => self.validate_select_query(query)?,
                _ => return Err(ValidationError::ForbiddenStatement),
            }
        }
        Ok(ValidationResult::Valid)
    }
}

The allowed functions include: count, sum, avg, min, max, length, substr, upper, lower, datetime, strftime, unixepoch, coalesce, nullif, ifnull.

Detection Rule Execution#

Sandboxed Execution: Safe execution of detection rules with resource limits.

pub struct DetectionEngine {
    db: redb::Database,
    sql_validator: SqlValidator,
    rule_manager: RuleManager,
    alert_manager: AlertManager,
}

impl DetectionEngine {
    pub async fn execute_rules(&self, scan_id: i64) -> Result<Vec<Alert>> {
        let rules = self.rule_manager.load_enabled_rules().await?;
        let mut alerts = Vec::new();
        for rule in rules {
            match self.execute_rule(&rule, scan_id).await {
                Ok(rule_alerts) => alerts.extend(rule_alerts),
                Err(e) => {
                    tracing::error!(rule_id = %rule.id, error = %e, "Failed to execute detection rule");
                }
            }
        }
        Ok(alerts)
    }

    async fn execute_rule(&self, rule: &DetectionRule, scan_id: i64) -> Result<Vec<Alert>> {
        self.sql_validator.validate_query(&rule.sql_query)?;
        let execution_result = tokio::time::timeout(
            Duration::from_secs(30),
            self.execute_sql_query(&rule.sql_query, scan_id),
        ).await??;

        let mut alerts = Vec::new();
        for row in execution_result.rows {
            let alert = self.alert_manager.generate_alert(&rule, &row, scan_id).await?;
            if let Some(alert) = alert {
                alerts.push(alert);
            }
        }
        Ok(alerts)
    }
}

Alert Generation and Management#

Alert Data Model#

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Alert {
    pub id: Uuid,
    pub alert_time: i64,
    pub rule_id: String,
    pub title: String,
    pub description: String,
    pub severity: AlertSeverity,
    pub scan_id: Option<i64>,
    pub affected_processes: Vec<u32>,
    pub process_count: i32,
    pub alert_data: serde_json::Value,
    pub rule_execution_time_ms: Option<i64>,
    pub dedupe_key: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlertSeverity {
    Low,
    Medium,
    High,
    Critical,
}

Alert Deduplication#

Intelligent Deduplication: Prevent alert spam while maintaining security visibility.

pub struct AlertManager {
    db: redb::Database,
    dedupe_cache: Arc<Mutex<HashMap<String, Instant>>>,
    dedupe_window: Duration,
}

impl AlertManager {
    pub async fn generate_alert(
        &self, rule: &DetectionRule, process_data: &ProcessRow, scan_id: Option<i64>,
    ) -> Result<Option<Alert>> {
        let alert = Alert::new(rule, process_data, scan_id);
        if self.is_duplicate(&alert).await? {
            return Ok(None);
        }
        self.store_alert(&alert).await?;
        self.update_dedupe_cache(&alert).await?;
        Ok(Some(alert))
    }
}

Multi-Channel Alert Delivery#

Alert Sink Architecture#

Pluggable Sinks: Flexible alert delivery through multiple channels.

#[async_trait]
pub trait AlertSink: Send + Sync {
    async fn send(&self, alert: &Alert) -> Result<DeliveryResult>;
    async fn health_check(&self) -> HealthStatus;
    fn name(&self) -> &str;
}

pub struct AlertDeliveryManager {
    sinks: Vec<Box<dyn AlertSink>>,
    retry_policy: RetryPolicy,
    circuit_breaker: CircuitBreaker,
}

impl AlertDeliveryManager {
    pub async fn deliver_alert(&self, alert: &Alert) -> Result<Vec<DeliveryResult>> {
        let alert = alert.clone();
        let delivery_futures: Vec<_> = self.sinks.iter()
            .map(|sink| {
                let alert = alert.clone();
                async move { sink.send(&alert).await }
            })
            .collect();
        let results = futures::future::join_all(delivery_futures).await;
        Ok(results.into_iter()
            .map(|r| match r {
                Ok(dr) => dr,
                Err(e) => DeliveryResult::Failed(e.to_string()),
            })
            .collect())
    }
}

Specific Sink Implementations#

Supported sinks include:

  • Stdout Sink: Outputs alerts in JSON, text, or CSV format to standard output
  • Syslog Sink: Sends alerts via Unix datagram sockets with severity-mapped priority levels
  • Webhook Sink: Posts JSON alert payloads to configured HTTP endpoints with timeout and retry support
    Each sink implements health checks and integrates with the circuit breaker pattern for reliability.

Performance Requirements and Optimization#

Process Collection Performance#

Target Metrics:

  • Process Enumeration: <5 seconds for 10,000+ processes
  • CPU Usage: <5% sustained during continuous monitoring
  • Memory Usage: <100MB resident under normal operation
  • Hash Computation: Complete within enumeration time

Detection Engine Performance#

Target Metrics:

  • Rule Execution: <100ms per detection rule
  • SQL Validation: <10ms per query
  • Resource Limits: 30-second timeout, memory limits
  • Concurrent Execution: Parallel rule processing
    The engine groups rules by complexity for optimal scheduling, executing simple rules in parallel and complex rules sequentially to avoid resource contention.

Error Handling and Recovery#

Graceful Degradation#

  • Process Collection Failures: Enhanced enumeration falls back to basic sysinfo collection
  • Detection Engine Failures: Problematic rules are automatically disabled to prevent repeated failures

Resource Management#

Memory Pressure Handling: When memory usage exceeds the configured threshold, the system reduces hash computation buffer sizes (with a minimum of 4KB) and triggers cooperative yielding.
This core monitoring specification provides the foundation for DaemonEye's process monitoring capabilities, ensuring high performance, security, and reliability across all supported platforms.