Documents
Migration Strategy: Crossbeam to DaemonEye EventBus
Migration Strategy: Crossbeam to DaemonEye EventBus
Type
External
Status
Published
Created
Mar 8, 2026
Updated
Apr 3, 2026
Updated by
Dosu Bot

Overview#

This document outlines the migration strategy from the current crossbeam-based event bus implementation to the daemoneye-eventbus message broker. The migration preserves existing event bus semantics while enabling multi-process communication and enhanced scalability.

Current Crossbeam Implementation Analysis#

Existing Architecture#

The current collector-core uses crossbeam channels for high-performance in-process event distribution:

// Current crossbeam-based implementation
use crossbeam::{
    channel::{Receiver, Sender, bounded, unbounded},
    utils::Backoff,
};

pub struct HighPerformanceEventBusImpl {
    publisher: Sender<BusEvent>, // Crossbeam unbounded sender
    subscribers: HashMap<String, Sender<BusEvent>>, // Per-subscriber channels
    routing_handle: thread::JoinHandle<()>, // Background routing thread
}

Key Characteristics to Preserve#

  1. High Throughput: 10,000+ messages/second
  2. Low Latency: Sub-millisecond delivery
  3. Backpressure Handling: Configurable strategies (drop, block, circuit-break)
  4. Lock-Free Operation: Minimal contention
  5. Event Ordering: FIFO delivery per subscriber
  6. Graceful Shutdown: Clean resource cleanup

Migration Mapping#

Channel Type Mapping#

**Crossbeam Component****DaemonEye EventBus Equivalent****Migration Notes**
`crossbeam::channel::unbounded()``DaemoneyeBroker::publish()`Main event distribution
`crossbeam::channel::bounded()``mpsc::UnboundedReceiver`Per-subscriber queues
`crossbeam::utils::Backoff`Built-in broker backpressureAutomatic handling
`std::thread::spawn()``tokio::spawn()`Async task management
### Interface Compatibility ```rust // BEFORE: Crossbeam EventBus trait #[async_trait] pub trait EventBus: Send + Sync { async fn publish(&self, event: CollectionEvent, correlation_id: Option) -> Result<()>; async fn subscribe( &mut self, subscription: EventSubscription, ) -> Result>; async fn unsubscribe(&mut self, subscriber_id: &str) -> Result<()>; async fn get_statistics(&self) -> Result; }

// AFTER: DaemonEye EventBus trait (identical interface)
#[async_trait]
pub trait EventBus: Send + Sync {
async fn publish(&mut self, event: CollectionEvent, correlation_id: String) -> Result<()>;
async fn subscribe(
&mut self,
subscription: EventSubscription,
) -> Result<mpsc::UnboundedReceiver>;
async fn unsubscribe(&mut self, subscriber_id: &str) -> Result<()>;
async fn statistics(&self) -> EventBusStatistics;
}

## Migration Implementation Plan
### Phase 1: Interface Adaptation Layer
Create a compatibility wrapper that implements the existing EventBus trait using daemoneye-eventbus:
```rust
// collector-core/src/event_bus.rs - Updated implementation
use daemoneye_eventbus::{DaemoneyeBroker, DaemoneyeEventBus, EventBus as DaemoneyeEventBusTrait};

pub struct DaemoneyeEventBusAdapter {
    inner: DaemoneyeEventBus,
    broker: Arc<DaemoneyeBroker>,
}

impl DaemoneyeEventBusAdapter {
    pub async fn new(socket_path: &str) -> Result<Self> {
        let broker = Arc::new(DaemoneyeBroker::new(socket_path).await?);
        broker.start().await?;
        let inner = DaemoneyeEventBus::from_broker((*broker).clone()).await?;
        Ok(Self { inner, broker })
    }
}

#[async_trait]
impl EventBus for DaemoneyeEventBusAdapter {
    async fn publish(&self, event: CollectionEvent, correlation_id: Option<String>) -> Result<()> {
        let correlation = correlation_id.unwrap_or_else(|| Uuid::new_v4().to_string());
        let daemoneye_event = self.convert_event(event)?;
        self.inner
            .publish(daemoneye_event, correlation)
            .await
            .map_err(|e| anyhow::anyhow!("EventBus publish failed: {}", e))
    }

    async fn subscribe(
        &mut self,
        subscription: EventSubscription,
    ) -> Result<tokio::sync::mpsc::UnboundedReceiver<BusEvent>> {
        let daemoneye_subscription = self.convert_subscription(subscription)?;
        let receiver = self
            .inner
            .subscribe(daemoneye_subscription)
            .await
            .map_err(|e| anyhow::anyhow!("EventBus subscribe failed: {}", e))?;

        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
        tokio::spawn(async move {
            let mut daemoneye_receiver = receiver;
            while let Some(daemoneye_event) = daemoneye_receiver.recv().await {
                let collector_event = Self::convert_bus_event(daemoneye_event);
                if tx.send(collector_event).is_err() {
                    break; // Receiver dropped
                }
            }
        });
        Ok(rx)
    }

    async fn unsubscribe(&mut self, subscriber_id: &str) -> Result<()> {
        self.inner
            .unsubscribe(subscriber_id)
            .await
            .map_err(|e| anyhow::anyhow!("EventBus unsubscribe failed: {}", e))
    }

    async fn get_statistics(&self) -> Result<EventBusStatistics> {
        let daemoneye_stats = self.inner.statistics().await;
        Ok(EventBusStatistics {
            events_published: daemoneye_stats.messages_published,
            events_delivered: daemoneye_stats.messages_delivered,
            active_subscribers: daemoneye_stats.active_subscribers,
            uptime: Duration::from_secs(daemoneye_stats.uptime_seconds),
        })
    }
}

Phase 2: Event Type Conversion#

Map collector-core events to daemoneye-eventbus events:

impl DaemoneyeEventBusAdapter {
    fn convert_event(
        &self,
        event: collector_core::CollectionEvent,
    ) -> Result<daemoneye_eventbus::CollectionEvent> {
        match event {
            collector_core::CollectionEvent::Process(proc_event) => Ok(
                daemoneye_eventbus::CollectionEvent::Process(
                    daemoneye_eventbus::ProcessEvent {
                        pid: proc_event.pid,
                        name: proc_event.name,
                        command_line: proc_event.command_line.join(" ").into(),
                        executable_path: proc_event.executable_path,
                        ppid: proc_event.ppid,
                        start_time: proc_event.start_time,
                        metadata: proc_event.platform_metadata.unwrap_or_default(),
                    },
                ),
            ),
            collector_core::CollectionEvent::Network(net_event) => {
                Ok(daemoneye_eventbus::CollectionEvent::Network(
                    daemoneye_eventbus::NetworkEvent {
                        connection_id: net_event.connection_id,
                        protocol: net_event.protocol,
                        local_addr: net_event.local_addr,
                        remote_addr: net_event.remote_addr,
                        metadata: HashMap::new(),
                    },
                ))
            }
            // Add other event type conversions...
        }
    }

    fn convert_bus_event(
        daemoneye_event: daemoneye_eventbus::BusEvent,
    ) -> collector_core::BusEvent {
        collector_core::BusEvent {
            id: Uuid::parse_str(&daemoneye_event.event_id)
                .unwrap_or_else(|_| Uuid::new_v4()),
            timestamp: daemoneye_event
                .bus_timestamp
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap_or_default()
                .as_secs(),
            event: Self::convert_daemoneye_event_to_collector(daemoneye_event.event),
            correlation_id: Some(daemoneye_event.correlation_id),
            routing_metadata: HashMap::new(),
        }
    }
}

Phase 3: Topic Pattern Migration#

Map crossbeam routing to daemoneye-eventbus topics:

impl DaemoneyeEventBusAdapter {
    fn convert_subscription(
        &self,
        subscription: collector_core::EventSubscription,
    ) -> Result<daemoneye_eventbus::EventSubscription> {
        let topic_patterns = if let Some(patterns) = subscription.topic_patterns {
            patterns.into_iter().map(|p| self.map_topic_pattern(p)).collect()
        } else {
            self.generate_topic_patterns_from_capabilities(&subscription.capabilities)
        };

        Ok(daemoneye_eventbus::EventSubscription {
            subscriber_id: subscription.subscriber_id,
            capabilities: daemoneye_eventbus::SourceCaps {
                event_types: self.extract_event_types(&subscription.capabilities),
                collectors: vec!["*".to_string()],
                max_priority: 10,
            },
            event_filter: subscription
                .event_filter
                .map(|f| self.convert_event_filter(f)),
            correlation_filter: subscription.correlation_filter.map(|c| {
                daemoneye_eventbus::CorrelationFilter {
                    correlation_id: Some(c),
                    process_ids: vec![],
                }
            }),
            topic_patterns: Some(topic_patterns),
            enable_wildcards: subscription.enable_wildcards,
        })
    }

    fn map_topic_pattern(&self, pattern: String) -> String {
        match pattern.as_str() {
            "process" | "process.*" => "events.process.*".to_string(),
            "network" | "network.*" => "events.network.*".to_string(),
            "filesystem" | "filesystem.*" => "events.filesystem.*".to_string(),
            "performance" | "performance.*" => "events.performance.*".to_string(),
            "control" | "control.*" => "control.*".to_string(),
            _ => format!("events.{}", pattern),
        }
    }
}

Phase 4: Configuration Migration#

Update collector-core configuration to use daemoneye-eventbus:

// collector-core/src/config.rs - Updated configuration
#[derive(Debug, Clone, serde::Deserialize)]
pub struct CollectorConfig {
    pub event_bus: EventBusConfig,
}

#[derive(Debug, Clone, serde::Deserialize)]
pub struct EventBusConfig {
    pub bus_type: EventBusType,
    pub socket_path: Option<String>,
    pub crossbeam: Option<CrossbeamConfig>,
    pub daemoneye: Option<DaemoneyeConfig>,
}

#[derive(Debug, Clone, serde::Deserialize)]
pub enum EventBusType {
    #[serde(rename = "local")]
    Local, // Crossbeam-based (legacy)
    #[serde(rename = "daemoneye")]
    DaemonEye, // DaemonEye eventbus (new)
}

impl Default for EventBusConfig {
    fn default() -> Self {
        Self {
            bus_type: EventBusType::DaemonEye,
            socket_path: None,
            crossbeam: None,
            daemoneye: Some(DaemoneyeConfig::default()),
        }
    }
}

Phase 5: Factory Pattern for Event Bus Creation#

Create a factory to instantiate the appropriate event bus implementation:

// collector-core/src/event_bus.rs - Factory implementation
pub struct EventBusFactory;

impl EventBusFactory {
    pub async fn create(config: &EventBusConfig) -> Result<Box<dyn EventBus>> {
        match config.bus_type {
            EventBusType::Local => {
                let crossbeam_config = config.crossbeam.clone().unwrap_or_default();
                Ok(Box::new(LocalEventBus::new(crossbeam_config.into())))
            }
            EventBusType::DaemonEye => {
                let socket_path = config
                    .socket_path
                    .as_deref()
                    .unwrap_or("/tmp/daemoneye-collector.sock");
                let adapter = DaemoneyeEventBusAdapter::new(socket_path).await?;
                Ok(Box::new(adapter))
            }
        }
    }
}

// Usage in collector-core
impl Collector {
    pub async fn new(config: CollectorConfig) -> Result<Self> {
        let event_bus = EventBusFactory::create(&config.event_bus).await?;
        Ok(Self { config, event_bus })
    }
}

Behavioral Equivalence Testing#

Test Strategy#

  1. Unit Tests: Verify interface compatibility
  2. Integration Tests: Compare crossbeam vs daemoneye-eventbus behavior
  3. Performance Tests: Ensure throughput and latency requirements
  4. Stress Tests: Validate under high load conditions

Test Implementation#

#[cfg(test)]
mod migration_tests {
    use super::*;

    #[tokio::test]
    async fn test_behavioral_equivalence() {
        let crossbeam_bus = LocalEventBus::new(EventBusConfig::default());
        let daemoneye_bus = DaemoneyeEventBusAdapter::new("/tmp/test.sock")
            .await
            .unwrap();

        test_publish_subscribe_pattern(&crossbeam_bus).await;
        test_publish_subscribe_pattern(&daemoneye_bus).await;

        let crossbeam_stats = crossbeam_bus.get_statistics().await.unwrap();
        let daemoneye_stats = daemoneye_bus.get_statistics().await.unwrap();

        assert_eq!(
            crossbeam_stats.events_published,
            daemoneye_stats.events_published
        );
        assert_eq!(
            crossbeam_stats.events_delivered,
            daemoneye_stats.events_delivered
        );
    }

    async fn test_publish_subscribe_pattern<T: EventBus>(bus: &T) {
        // Identical test logic for both implementations
    }

    #[tokio::test]
    async fn test_performance_equivalence() {
        let crossbeam_throughput = benchmark_crossbeam().await;
        let daemoneye_throughput = benchmark_daemoneye().await;
        assert!(daemoneye_throughput >= crossbeam_throughput * 0.8); // 80% minimum
    }
}

Migration Rollout Plan#

Phase 1: Preparation (Week 1)#

  • Implement DaemoneyeEventBusAdapter
  • Create event type conversion functions
  • Add configuration support for both bus types
  • Implement EventBusFactory

Phase 2: Testing (Week 2)#

  • Unit tests for adapter functionality
  • Integration tests comparing both implementations
  • Performance benchmarks
  • Stress testing under load

Phase 3: Gradual Migration (Week 3)#

  • Default to daemoneye-eventbus for new deployments
  • Provide configuration flag for crossbeam fallback
  • Monitor production performance metrics
  • Address any compatibility issues

Phase 4: Legacy Removal (Week 4)#

  • Remove crossbeam dependencies
  • Clean up legacy code paths
  • Update documentation
  • Final performance validation

Risk Mitigation#

Compatibility Risks#

  • Event Ordering: Ensure FIFO delivery is maintained
    • Mitigation: Use single-threaded routing in daemoneye-eventbus
  • Performance Regression: Potential throughput/latency impact
    • Mitigation: Comprehensive benchmarking and optimization
  • Memory Usage: Different memory patterns between implementations
    • Mitigation: Memory profiling and resource limit enforcement

Rollback Strategy#

  1. Configuration-Based Rollback: Switch bus_type to "local"
  2. Gradual Rollback: Migrate collectors back to crossbeam individually
  3. Emergency Rollback: Revert to previous collector-core version

Success Criteria#

Functional Requirements#

  • All existing EventBus trait methods work identically
  • Event delivery semantics preserved (ordering, reliability)
  • Subscription patterns work as expected
  • Statistics and monitoring maintain accuracy

Performance Requirements#

  • Throughput: greater than or equal to 80% of crossbeam performance
  • Latency: no more than 2x crossbeam latency for local IPC
  • Memory: no more than 150% of crossbeam memory usage
  • CPU: no more than 120% of crossbeam CPU usage

Operational Requirements#

  • Graceful shutdown works correctly
  • Error handling maintains robustness
  • Configuration migration is seamless
  • Monitoring and observability preserved

Conclusion#

This migration strategy provides a comprehensive approach to replacing crossbeam channels with daemoneye-eventbus while maintaining full backward compatibility and operational stability. The phased approach allows for thorough testing and gradual rollout, minimizing risk while enabling the enhanced multi-process communication capabilities of the daemoneye-eventbus system.