Skip to content

Observer Module Developer Guide¤

Development Environment Setup¤

Prerequisites¤

  • Python 3.8+ with asyncio support
  • Poetry or pip for dependency management
  • pytest-asyncio for testing async components
  • mypy for type checking

Installation¤

# Install development dependencies
pip install -e ".[dev]"

# Install testing tools
pip install pytest-asyncio pytest-mock

# Verify installation
python -c "from panther.core.observer import IObserver; print('Observer module ready')"

IDE Configuration¤

Configure your IDE for observer development:

VS Code

{
    "python.linting.mypyEnabled": true,
    "python.testing.pytestArgs": ["--asyncio-mode=auto"],
    "python.testing.pytestEnabled": true
}

Development Workflow¤

Creating Custom Observers¤

1. Basic Observer Implementation¤

from panther.core.observer.base.observer_interface import IObserver
from panther.core.events.base.event_base import BaseEvent

class MyCustomObserver(IObserver):
    """Custom observer for domain-specific event handling.

    Requires:
        Event filtering through is_interested()
        Thread-safe event processing

    Ensures:
        No duplicate event processing
        Graceful error handling
        Resource cleanup
    """

    def __init__(self, config: Dict[str, Any] = None):
        super().__init__()
        self.config = config or {}
        self.processed_count = 0

    def is_interested(self, event_type: str) -> bool:
        """Check if observer handles this event type.

        Args:
            event_type: Type string to check interest for

        Returns:
            True if observer should process this event type

        Example:
            >>> observer = MyCustomObserver()
            >>> observer.is_interested("test.started")
            True
        """
        return event_type.startswith("test.")

    def on_event(self, event: BaseEvent):
        """Process received event.

        Args:
            event: Event to process

        Requires:
            Event UUID not in processed_events_uuids

        Ensures:
            Event UUID added to processed_events_uuids
            Processed count incremented

        Example:
            >>> observer = MyCustomObserver()
            >>> event = TestStartedEvent(entity_id="test-123")
            >>> observer.on_event(event)
            >>> assert observer.processed_count == 1
        """
        if event.uuid in self.processed_events_uuids:
            return  # Skip duplicate

        try:
            self._process_event(event)
            self.processed_events_uuids.append(event.uuid)
            self.processed_count += 1
        except Exception as e:
            self._handle_error(event, e)

    def _process_event(self, event: BaseEvent):
        """Override this method for custom event processing."""
        pass

    def _handle_error(self, event: BaseEvent, error: Exception):
        """Handle processing errors gracefully."""
        print(f"Error processing {event.event_type}: {error}")

2. Typed Observer Implementation¤

from panther.core.observer.base.typed_observer_interface import ITypedObserver
from panther.core.events.test.events import TestEvent

class MyTypedObserver(ITypedObserver[TestEvent]):
    """Type-safe observer for specific event types.

    Provides compile-time type checking for event handling.
    """

    def on_event(self, event: TestEvent):
        """Process test events with type safety.

        Args:
            event: Typed test event
        """
        match event.event_type:
            case "test.started":
                self._handle_test_started(event)
            case "test.completed":
                self._handle_test_completed(event)
            case _:
                self._handle_unknown_event(event)

    def _handle_test_started(self, event: TestEvent):
        """Handle test start events."""
        pass

    def _handle_test_completed(self, event: TestEvent):
        """Handle test completion events."""
        pass

3. Plugin Observer Implementation¤

from panther.core.observer.base.observer_plugin_interface import IPluginObserver
from panther.core.events.plugin.events import PluginEvent

class MyPluginObserver(IPluginObserver):
    """Observer that responds to plugin lifecycle events."""

    def get_supported_events(self) -> List[str]:
        """Return list of supported event types.

        Returns:
            List of event type strings this observer handles
        """
        return [
            "plugin.loaded",
            "plugin.started",
            "plugin.stopped",
            "plugin.error"
        ]

    def handle_plugin_event(self, event: PluginEvent):
        """Handle plugin-specific events.

        Args:
            event: Plugin lifecycle event
        """
        if event.event_type == "plugin.loaded":
            self._on_plugin_loaded(event)
        elif event.event_type == "plugin.error":
            self._on_plugin_error(event)

Observer Factory Integration¤

1. Register Custom Observer¤

from panther.core.observer.factory.observer_factory import get_observer_factory

def register_my_observer():
    """Register custom observer with factory."""
    factory = get_observer_factory()

    factory.register_observer_type(
        "my_custom",
        MyCustomObserver,
        config_schema={
            "enabled": bool,
            "filter_patterns": list,
            "output_format": str
        }
    )

# Register during application startup
register_my_observer()

2. Configuration-Driven Creation¤

# config/observers.yaml
observers:
  - type: my_custom
    enabled: true
    config:
      filter_patterns: ["test.*", "experiment.*"]
      output_format: "json"
def load_observers_from_config():
    """Load observers from configuration file."""
    from panther.core.observer.factory.factory_config import load_observer_config

    observers = load_observer_config("config/observers.yaml")
    return observers

Testing Guidelines¤

Unit Tests for Observers¤

import pytest
from unittest.mock import Mock
from panther.core.events.test.events import TestStartedEvent

class TestMyCustomObserver:
    """Test suite for MyCustomObserver."""

    def test_interest_filtering(self):
        """Test event type interest filtering."""
        observer = MyCustomObserver()

        assert observer.is_interested("test.started")
        assert observer.is_interested("test.completed")
        assert not observer.is_interested("metrics.collected")

    def test_event_processing(self):
        """Test basic event processing."""
        observer = MyCustomObserver()
        event = TestStartedEvent(entity_id="test-123")

        observer.on_event(event)

        assert observer.processed_count == 1
        assert event.uuid in observer.processed_events_uuids

    def test_duplicate_prevention(self):
        """Test duplicate event prevention."""
        observer = MyCustomObserver()
        event = TestStartedEvent(entity_id="test-123")

        # Process same event twice
        observer.on_event(event)
        observer.on_event(event)

        assert observer.processed_count == 1  # Only processed once

    def test_error_handling(self):
        """Test graceful error handling."""
        observer = MyCustomObserver()
        observer._process_event = Mock(side_effect=Exception("Test error"))

        event = TestStartedEvent(entity_id="test-123")

        # Should not raise exception
        observer.on_event(event)

        assert observer.processed_count == 0  # Not incremented on error

Integration Tests¤

import pytest
from panther.core.observer.management.event_manager import EventManager

@pytest.fixture
def event_manager():
    """Create event manager for testing."""
    return EventManager()

def test_observer_registration(event_manager):
    """Test observer registration and event routing."""
    observer = MyCustomObserver()
    event_manager.register_observer(observer)

    # Emit test event
    event = TestStartedEvent(entity_id="integration-test")
    event_manager.emit_event(event)

    assert observer.processed_count == 1

def test_multiple_observers(event_manager):
    """Test multiple observers receiving same event."""
    observer1 = MyCustomObserver()
    observer2 = MyCustomObserver()

    event_manager.register_observer(observer1)
    event_manager.register_observer(observer2)

    event = TestStartedEvent(entity_id="multi-test")
    event_manager.emit_event(event)

    assert observer1.processed_count == 1
    assert observer2.processed_count == 1

Async Observer Development¤

Async Observer Implementation¤

import asyncio
from panther.core.observer.base.observer_interface import IObserver

class MyAsyncObserver(IObserver):
    """Async observer for I/O bound operations."""

    def __init__(self):
        super().__init__()
        self.queue = asyncio.Queue()
        self.processor_task = None

    async def start_processing(self):
        """Start async event processing loop."""
        self.processor_task = asyncio.create_task(self._process_events())

    async def stop_processing(self):
        """Stop async processing and cleanup."""
        if self.processor_task:
            self.processor_task.cancel()
            try:
                await self.processor_task
            except asyncio.CancelledError:
                pass

    def on_event(self, event: BaseEvent):
        """Queue event for async processing."""
        if event.uuid not in self.processed_events_uuids:
            asyncio.create_task(self.queue.put(event))

    async def _process_events(self):
        """Async event processing loop."""
        while True:
            try:
                event = await self.queue.get()
                await self._process_event_async(event)
                self.processed_events_uuids.append(event.uuid)
                self.queue.task_done()
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Error in async processing: {e}")

    async def _process_event_async(self, event: BaseEvent):
        """Override for async event processing."""
        # Example: async I/O operation
        await asyncio.sleep(0.1)  # Simulate async work

Testing Async Observers¤

@pytest.mark.asyncio
async def test_async_observer():
    """Test async observer functionality."""
    observer = MyAsyncObserver()
    await observer.start_processing()

    try:
        event = TestStartedEvent(entity_id="async-test")
        observer.on_event(event)

        # Wait for processing
        await observer.queue.join()

        assert event.uuid in observer.processed_events_uuids
    finally:
        await observer.stop_processing()

Performance Optimization¤

Observer Performance Guidelines¤

Efficient Interest Filtering¤

class OptimizedObserver(IObserver):
    """Observer with optimized interest filtering."""

    def __init__(self, event_prefixes: Set[str]):
        super().__init__()
        self.event_prefixes = event_prefixes

    def is_interested(self, event_type: str) -> bool:
        """Fast prefix-based filtering.

        O(1) lookup instead of string operations.
        """
        return any(event_type.startswith(prefix) for prefix in self.event_prefixes)

Batch Processing Pattern¤

class BatchingObserver(IObserver):
    """Observer that processes events in batches."""

    def __init__(self, batch_size: int = 100, flush_interval: float = 5.0):
        super().__init__()
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.event_batch = []
        self.last_flush = time.time()

    def on_event(self, event: BaseEvent):
        """Add event to batch for processing."""
        self.event_batch.append(event)

        if (len(self.event_batch) >= self.batch_size or
            time.time() - self.last_flush > self.flush_interval):
            self._flush_batch()

    def _flush_batch(self):
        """Process accumulated events in batch."""
        if not self.event_batch:
            return

        try:
            self._process_batch(self.event_batch)
            self.processed_events_uuids.extend(e.uuid for e in self.event_batch)
        finally:
            self.event_batch.clear()
            self.last_flush = time.time()

Memory Management¤

Observer Lifecycle Management¤

class ManagedObserver(IObserver):
    """Observer with automatic resource management."""

    def __init__(self, max_processed_events: int = 10000):
        super().__init__()
        self.max_processed_events = max_processed_events

    def on_event(self, event: BaseEvent):
        """Process event with memory management."""
        super().on_event(event)

        # Prevent unlimited growth of processed events list
        if len(self.processed_events_uuids) > self.max_processed_events:
            # Keep only recent half
            self.processed_events_uuids = self.processed_events_uuids[-self.max_processed_events//2:]

    def cleanup(self):
        """Cleanup observer resources."""
        self.processed_events_uuids.clear()

Debugging and Troubleshooting¤

Observer Debug Utilities¤

Debug Observer¤

class DebugObserver(IObserver):
    """Observer that logs detailed processing information."""

    def __init__(self, logger=None):
        super().__init__()
        self.logger = logger or logging.getLogger(__name__)
        self.processing_times = {}

    def on_event(self, event: BaseEvent):
        """Process event with detailed logging."""
        start_time = time.time()

        self.logger.debug(f"Processing event: {event.event_type} - {event.uuid}")

        try:
            super().on_event(event)
            processing_time = time.time() - start_time
            self.processing_times[event.event_type] = processing_time

            self.logger.debug(f"Processed in {processing_time:.3f}s")
        except Exception as e:
            self.logger.error(f"Error processing event {event.uuid}: {e}")
            raise

Performance Profiler¤

class ProfilingObserver(IObserver):
    """Observer that profiles event processing performance."""

    def __init__(self):
        super().__init__()
        self.stats = {
            'total_events': 0,
            'processing_times': [],
            'error_count': 0,
            'events_by_type': defaultdict(int)
        }

    def on_event(self, event: BaseEvent):
        """Profile event processing."""
        start_time = time.perf_counter()

        try:
            super().on_event(event)
            processing_time = time.perf_counter() - start_time

            self.stats['total_events'] += 1
            self.stats['processing_times'].append(processing_time)
            self.stats['events_by_type'][event.event_type] += 1

        except Exception as e:
            self.stats['error_count'] += 1
            raise

    def get_performance_report(self) -> Dict[str, Any]:
        """Generate performance statistics report."""
        if not self.stats['processing_times']:
            return {}

        times = self.stats['processing_times']
        return {
            'total_events': self.stats['total_events'],
            'avg_processing_time': statistics.mean(times),
            'median_processing_time': statistics.median(times),
            'max_processing_time': max(times),
            'error_rate': self.stats['error_count'] / self.stats['total_events'],
            'events_by_type': dict(self.stats['events_by_type'])
        }

Common Issues and Solutions¤

Issue: Observer Not Receiving Events¤

def debug_observer_registration():
    """Debug observer registration issues."""
    from panther.core.observer.management.event_manager import get_event_manager

    manager = get_event_manager()
    observers = manager.get_registered_observers()

    print(f"Registered observers: {len(observers)}")
    for i, observer in enumerate(observers):
        print(f"  {i}: {type(observer).__name__}")

        # Test interest filtering
        test_events = ["test.started", "metrics.collected", "error.occurred"]
        for event_type in test_events:
            interested = observer.is_interested(event_type)
            print(f"    {event_type}: {interested}")

Issue: Memory Leaks¤

def check_observer_memory():
    """Check for observer memory issues."""
    import psutil
    import gc

    process = psutil.Process()

    print(f"Memory usage: {process.memory_info().rss / 1024 / 1024:.1f} MB")
    print(f"Open file descriptors: {process.num_fds()}")

    # Check for observer objects
    observer_objects = [obj for obj in gc.get_objects() if isinstance(obj, IObserver)]
    print(f"Observer objects in memory: {len(observer_objects)}")

    # Check processed events list sizes
    for observer in observer_objects:
        events_count = len(observer.processed_events_uuids)
        print(f"  {type(observer).__name__}: {events_count} processed events")

Issue: Performance Degradation¤

# Profile observer performance
python -m cProfile -s cumulative -o observer_profile.prof test_observer_performance.py

# Analyze profile results
python -c "
import pstats
stats = pstats.Stats('observer_profile.prof')
stats.sort_stats('cumulative').print_stats(20)
"

# Memory profiling
python -m memory_profiler observer_memory_test.py

Quality Checklist¤

Code Quality¤

  • Observer implements IObserver interface correctly
  • Proper error handling and logging
  • Thread-safe implementation if needed
  • Resource cleanup in destructor/cleanup methods
  • Type hints for all methods and parameters

Testing Coverage¤

  • Unit tests for all observer methods
  • Integration tests with event manager
  • Error condition testing
  • Performance/load testing for high-volume scenarios
  • Memory leak testing for long-running observers

Documentation¤

  • Class and method docstrings with Args/Returns
  • Usage examples in docstrings
  • Configuration documentation
  • Performance characteristics documented
  • Error handling behavior documented