Overview#
The daemoneye-eventbus embedded broker streamlines the message broker architecture by operating directly within the daemoneye-agent process. This integration removes the need for external message broker dependencies, offering strong publish/subscribe messaging to facilitate coordination across multiple collectors.
Embedded Broker Architecture#
Core Components#
The embedded broker consists of:
- DaemoneyeBroker: Topic routing, subscription management, message delivery, and statistics tracking
- TopicMatcher: Wildcard matching, subscriber routing, and pattern validation
- Transport Layer: Unix sockets, named pipes, and connection pooling
These integrate with agent components including the Detection Engine (SQL rule execution, event correlation, alert generation) and Lifecycle Manager (collector supervision, health monitoring, configuration updates).
External collectors (procmond, netmond, fsmond) connect to the transport layer via IPC.
Deployment Architecture#
In-Process Deployment#
The embedded broker runs as a component within the daemoneye-agent process:
// daemoneye-agent startup sequence
pub struct DaemoneyeAgent {
// Embedded broker instance
event_broker: Arc<DaemoneyeBroker>,
// Detection engine with broker integration
detection_engine: DetectionEngine,
// Collector lifecycle manager
collector_manager: CollectorManager,
// Configuration manager
config: AgentConfig,
}
impl DaemoneyeAgent {
pub async fn new(config: AgentConfig) -> Result<Self> {
// Initialize embedded broker with agent-specific socket path
let socket_path = config
.broker
.socket_path
.unwrap_or_else(|| format!("/tmp/daemoneye-{}.sock", config.instance_id));
let event_broker = Arc::new(DaemoneyeBroker::new(&socket_path).await?);
// Start broker server
event_broker.start().await?;
// Initialize other components with broker reference
let detection_engine =
DetectionEngine::new(config.detection.clone(), Arc::clone(&event_broker)).await?;
let collector_manager =
CollectorManager::new(config.collectors.clone(), Arc::clone(&event_broker)).await?;
Ok(Self {
event_broker,
detection_engine,
collector_manager,
config,
})
}
pub async fn run(&self) -> Result<()> {
// Start all components concurrently
tokio::try_join!(
self.detection_engine.run(),
self.collector_manager.run(),
self.event_broker_monitor()
)?;
Ok(())
}
async fn event_broker_monitor(&self) -> Result<()> {
// Monitor broker health and statistics
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
let stats = self.event_broker.statistics().await;
tracing::info!(
messages_published = stats.messages_published,
messages_delivered = stats.messages_delivered,
active_subscribers = stats.active_subscribers,
uptime_seconds = stats.uptime_seconds,
"Broker statistics"
);
// Health check logic
if stats.active_subscribers == 0 && stats.uptime_seconds > 60 {
tracing::warn!("No active subscribers after 60 seconds");
}
}
}
}
Startup and Configuration Management#
Configuration Structure#
#[derive(Debug, Clone, serde::Deserialize)]
pub struct BrokerConfig {
/// Socket path for IPC communication
pub socket_path: Option<String>,
/// Maximum number of concurrent connections
pub max_connections: usize,
/// Message buffer size per subscriber
pub message_buffer_size: usize,
/// Connection timeout in seconds
pub connection_timeout_secs: u64,
/// Enable message statistics collection
pub enable_statistics: bool,
/// Statistics reporting interval in seconds
pub stats_interval_secs: u64,
/// Maximum message size in bytes
pub max_message_size: usize,
}
impl Default for BrokerConfig {
fn default() -> Self {
Self {
socket_path: None, // Auto-generated based on instance ID
max_connections: 64,
message_buffer_size: 1000,
connection_timeout_secs: 30,
enable_statistics: true,
stats_interval_secs: 30,
max_message_size: 1024 * 1024, // 1MB
}
}
}
#[derive(Debug, Clone, serde::Deserialize)]
pub struct AgentConfig {
/// Unique instance identifier
pub instance_id: String,
/// Embedded broker configuration
pub broker: BrokerConfig,
/// Detection engine configuration
pub detection: DetectionConfig,
/// Collector management configuration
pub collectors: CollectorConfig,
}
Startup Sequence#
The startup sequence proceeds as follows:
main()callsDaemoneyeAgent::new(config)- The agent initializes the
DaemoneyeBrokerwith the socket path - The broker initializes and binds the transport socket
- The agent initializes the detection engine and collector manager
main()callsagent.run()- Collectors are started and connect to the transport layer via IPC
- System becomes operational
Health Monitoring and Status Reporting#
Health Check Implementation#
#[derive(Debug, Clone, serde::Serialize)]
pub struct BrokerHealthStatus {
/// Broker operational status
pub status: HealthStatus,
/// Number of active connections
pub active_connections: usize,
/// Message processing rate (messages/second)
pub message_rate: f64,
/// Memory usage in bytes
pub memory_usage: usize,
/// Last error message, if any
pub last_error: Option<String>,
/// Uptime in seconds
pub uptime_seconds: u64,
}
impl DaemoneyeBroker {
pub async fn health_status(&self) -> BrokerHealthStatus {
let stats = self.statistics().await;
let clients_count = {
let clients_guard = self.clients.lock().await;
clients_guard.len()
};
// Calculate message rate over last minute
let message_rate = if stats.uptime_seconds > 0 {
stats.messages_published as f64 / stats.uptime_seconds as f64
} else {
0.0
};
// Determine health status based on metrics
let status = if clients_count == 0 && stats.uptime_seconds > 60 {
HealthStatus::Degraded
} else if stats.messages_published > 0 && stats.messages_delivered == 0 {
HealthStatus::Unhealthy
} else {
HealthStatus::Healthy
};
BrokerHealthStatus {
status,
active_connections: clients_count,
message_rate,
memory_usage: self.estimate_memory_usage().await,
last_error: None, // TODO: Track last error
uptime_seconds: stats.uptime_seconds,
}
}
async fn estimate_memory_usage(&self) -> usize {
let stats = self.statistics().await;
let base_overhead = 1024 * 1024; // 1MB base overhead
let per_subscriber_overhead = 64 * 1024; // 64KB per subscriber
base_overhead + (stats.active_subscribers * per_subscriber_overhead)
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
}
Status Reporting Integration#
impl DaemoneyeAgent {
pub async fn health_report(&self) -> AgentHealthReport {
let broker_health = self.event_broker.health_status().await;
let detection_health = self.detection_engine.health_status().await;
let collector_health = self.collector_manager.health_status().await;
AgentHealthReport {
overall_status: self.calculate_overall_status(&[
&broker_health.status,
&detection_health.status,
&collector_health.status,
]),
broker: broker_health,
detection_engine: detection_health,
collectors: collector_health,
timestamp: SystemTime::now(),
}
}
fn calculate_overall_status(&self, statuses: &[&HealthStatus]) -> HealthStatus {
if statuses.iter().any(|s| matches!(s, HealthStatus::Unhealthy)) {
HealthStatus::Unhealthy
} else if statuses.iter().any(|s| matches!(s, HealthStatus::Degraded)) {
HealthStatus::Degraded
} else {
HealthStatus::Healthy
}
}
}
Resource Allocation and Performance Characteristics#
Memory Management#
pub struct BrokerResourceLimits {
/// Maximum memory usage in bytes
pub max_memory_bytes: usize,
/// Maximum number of queued messages per subscriber
pub max_queued_messages: usize,
/// Maximum message size in bytes
pub max_message_size: usize,
/// Connection pool size
pub connection_pool_size: usize,
}
impl Default for BrokerResourceLimits {
fn default() -> Self {
Self {
max_memory_bytes: 100 * 1024 * 1024, // 100MB
max_queued_messages: 1000,
max_message_size: 1024 * 1024, // 1MB
connection_pool_size: 64,
}
}
}
impl DaemoneyeBroker {
pub async fn enforce_resource_limits(&self) -> Result<()> {
let current_memory = self.estimate_memory_usage().await;
let limits = &self.config.resource_limits;
if current_memory > limits.max_memory_bytes {
self.apply_backpressure().await?;
}
// Check message queue sizes
let senders_guard = self.subscriber_senders.lock().await;
for (subscriber_id, sender) in senders_guard.iter() {
if sender.len() > limits.max_queued_messages {
tracing::warn!(
subscriber_id = subscriber_id,
queue_size = sender.len(),
"Subscriber queue size exceeded, applying backpressure"
);
}
}
Ok(())
}
async fn apply_backpressure(&self) -> Result<()> {
tracing::warn!("Applying backpressure due to memory limits");
Ok(())
}
}
Performance Characteristics#
| Metric | Target | Measurement Method |
| **Message Throughput** | 10,000+ msg/sec | Benchmark with synthetic load |
| **Latency** | \< 1ms (local IPC) | End-to-end message delivery time |
| **Memory Usage** | \< 100MB baseline | RSS monitoring with process stats |
| **Connection Overhead** | \< 1MB per connection | Memory profiling per client |
| **CPU Usage** | \< 5% sustained | Process CPU monitoring |
| **Startup Time** | \< 2 seconds | Time from start() to ready |
impl DaemoneyeBroker {
pub async fn performance_metrics(&self) -> BrokerPerformanceMetrics {
let stats = self.statistics().await;
let senders_guard = self.subscriber_senders.lock().await;
let queue_depths = senders_guard
.iter()
.map(|(id, sender)| (id.clone(), sender.len()))
.collect();
BrokerPerformanceMetrics {
messages_per_second: self.calculate_message_rate().await,
avg_latency_us: self.calculate_avg_latency().await,
memory_usage_bytes: self.estimate_memory_usage().await,
cpu_usage_percent: self.get_cpu_usage().await,
active_connections: senders_guard.len(),
queue_depths,
}
}
}
## Integration Points
### Detection Engine Integration
```rust
impl DetectionEngine {
pub async fn new(config: DetectionConfig, broker: Arc<DaemoneyeBroker>) -> Result<Self> {
let mut event_bus = DaemoneyeEventBus::from_broker((*broker).clone()).await?;
// Subscribe to all collector events for detection processing
let subscription = EventSubscription {
subscriber_id: "detection-engine".to_string(),
capabilities: SourceCaps {
event_types: vec![
"process".to_string(),
"network".to_string(),
"filesystem".to_string(),
"performance".to_string(),
],
collectors: vec!["*".to_string()], // All collectors
max_priority: 10,
},
topic_patterns: Some(vec!["events.*".to_string()]),
enable_wildcards: true,
event_filter: None,
correlation_filter: None,
};
let event_receiver = event_bus.subscribe(subscription).await?;
Ok(Self {
config,
event_bus,
event_receiver: Some(event_receiver),
broker,
})
}
}
Collector Lifecycle Integration#
impl CollectorManager {
pub async fn start_collector(&self, collector_type: &str) -> Result<()> {
let rpc_client = CollectorRpcClient::new(Arc::clone(&self.broker));
let request = CollectorLifecycleRequest {
operation: CollectorOperation::Start,
collector_id: collector_type.to_string(),
config: self.get_collector_config(collector_type)?,
};
let response = rpc_client.send_request(request).await?;
if response.status == RpcStatus::Success {
tracing::info!("Successfully started collector: {}", collector_type);
} else {
return Err(EventBusError::rpc_error(format!(
"Failed to start collector {}: {}",
collector_type,
response.error_message.unwrap_or_default()
)));
}
Ok(())
}
}
This embedded broker architecture provides a robust, high-performance message broker that integrates seamlessly with the daemoneye-agent process while maintaining clear separation of concerns and enabling future extensibility.