Observer Pattern — Event-Driven Architecture 📡¤
Purpose: Decoupled communication between framework components Components: Event management, observers, lifecycle notifications Dependencies: Core framework, logging system
PANTHER's observer pattern enables loose coupling between components through event-driven communication, supporting real-time monitoring and coordinated component interactions.
Architecture Overview¤
# filepath: /Users/elniak/Documents/Project/PANTHER/panther/core/observer/
# Event-based communication system
# - Publishers emit events without knowing subscribers
# - Observers react to events they're interested in
# - Framework lifecycle events coordinate component behavior
Event Flow¤
- Event Registration → Components register as observers for specific events
- Event Publishing → Framework components emit events at key moments
- Event Distribution → Event manager notifies all registered observers
- Observer Reaction → Observers process events and potentially emit new ones
- Lifecycle Coordination → Framework components coordinate through events
Core Components¤
Event Management System¤
Location: panther/core/observer/
The observer system manages event subscription and distribution:
# filepath: /Users/elniak/Documents/Project/PANTHER/panther/core/observer/
class EventManager:
"""Central event coordination and distribution"""
def register_observer(self, event_type: str, observer: Observer):
"""Register an observer for specific event types"""
def publish_event(self, event: Event):
"""Publish event to all registered observers"""
def unregister_observer(self, event_type: str, observer: Observer):
"""Remove observer registration"""
Logger Observer¤
Location: panther/core/observer/logger_observer.py
Built-in observer for experiment lifecycle logging:
# filepath: /Users/elniak/Documents/Project/PANTHER/panther/core/observer/logger_observer.py
class LoggerObserver:
"""Observer that logs framework events for debugging and monitoring"""
def on_experiment_started(self, event):
"""Log experiment initialization"""
def on_test_case_started(self, event):
"""Log individual test case execution"""
def on_plugin_loaded(self, event):
"""Log plugin loading and initialization"""
Event Types¤
Framework Lifecycle Events¤
Experiment Level:
experiment.started
— Experiment initialization beginsexperiment.configured
— Configuration loading completedexperiment.finished
— Experiment execution completedexperiment.error
— Experiment encountered fatal error
Test Case Level:
test_case.started
— Individual test case beginstest_case.setup_complete
— Test environment readytest_case.execution_started
— Test execution phase beginstest_case.results_collected
— Test outputs gatheredtest_case.finished
— Test case completedtest_case.error
— Test case failed or encountered error
Plugin Events¤
Plugin Lifecycle:
plugin.discovered
— Plugin found during discoveryplugin.loaded
— Plugin successfully importedplugin.initialized
— Plugin setup completedplugin.error
— Plugin initialization or execution failed
Service Events:
service.starting
— Service initialization beginsservice.ready
— Service available for testingservice.stopping
— Service shutdown initiatedservice.stopped
— Service cleanup completed
Environment Events¤
Network Environment:
network.setup_started
— Network environment creation beginsnetwork.ready
— Network infrastructure availablenetwork.teardown_started
— Network cleanup beginsnetwork.teardown_complete
— Network resources released
Execution Environment:
execution.environment_ready
— Execution context preparedexecution.environment_error
— Environment setup failed
Usage Patterns¤
Creating Custom Observers¤
from panther.core.observer.base_observer import Observer
class CustomMonitoringObserver(Observer):
"""Custom observer for experiment monitoring"""
def __init__(self):
self.test_metrics = {}
def on_test_case_started(self, event):
"""Track test case start times"""
self.test_metrics[event.test_name] = {
'start_time': event.timestamp,
'status': 'running'
}
def on_test_case_finished(self, event):
"""Calculate test duration and status"""
if event.test_name in self.test_metrics:
self.test_metrics[event.test_name].update({
'end_time': event.timestamp,
'duration': event.timestamp - self.test_metrics[event.test_name]['start_time'],
'status': 'completed' if event.success else 'failed'
})
Registering Observers¤
# filepath: /Users/elniak/Documents/Project/PANTHER/panther/core/experiment_manager.py
# Register observers during experiment initialization
event_manager = EventManager()
logger_observer = LoggerObserver()
custom_observer = CustomMonitoringObserver()
event_manager.register_observer("test_case.*", logger_observer)
event_manager.register_observer("test_case.*", custom_observer)
Publishing Events¤
# filepath: /Users/elniak/Documents/Project/PANTHER/panther/core/test_cases/test_case_impl.py
# Emit events at key points in test execution
def execute(self):
# Notify test case start
self.event_manager.publish_event(TestCaseStartedEvent(
test_name=self.name,
timestamp=datetime.now(),
configuration=self.config
))
try:
# Execute test logic
result = self._run_test()
# Notify successful completion
self.event_manager.publish_event(TestCaseFinishedEvent(
test_name=self.name,
timestamp=datetime.now(),
success=True,
result=result
))
except Exception as e:
# Notify error
self.event_manager.publish_event(TestCaseErrorEvent(
test_name=self.name,
timestamp=datetime.now(),
error=str(e)
))
Integration Points¤
With Experiment Engine¤
The experiment manager uses events to coordinate component interactions:
# filepath: /Users/elniak/Documents/Project/PANTHER/panther/core/experiment_manager.py
def run_tests(self):
"""Execute tests with event coordination"""
self.event_manager.publish_event(ExperimentStartedEvent())
for test_case in self.test_cases:
# Events coordinate setup, execution, and cleanup
test_case.execute() # Emits test case lifecycle events
self.event_manager.publish_event(ExperimentFinishedEvent())
With Plugin System¤
Plugins can both observe and emit events:
# Plugin implementations can observe framework events
class PluginImplementation:
def __init__(self, event_manager):
self.event_manager = event_manager
event_manager.register_observer("service.ready", self.on_service_ready)
def on_service_ready(self, event):
"""React to service availability"""
if event.service_name == self.target_service:
self.start_interaction()
With Results System¤
Results collection can be event-driven:
# filepath: /Users/elniak/Documents/Project/PANTHER/panther/core/results/result_collector.py
class ResultCollector(Observer):
"""Collect results based on test case events"""
def on_test_case_finished(self, event):
"""Automatically collect results when test completes"""
self.collect_test_artifacts(event.test_name)
self.validate_results(event.result)
Real-Time Monitoring¤
Progress Tracking¤
class ProgressObserver(Observer):
"""Track experiment progress in real-time"""
def __init__(self):
self.total_tests = 0
self.completed_tests = 0
self.failed_tests = 0
def on_experiment_started(self, event):
self.total_tests = len(event.test_cases)
print(f"Starting experiment with {self.total_tests} tests")
def on_test_case_finished(self, event):
self.completed_tests += 1
if not event.success:
self.failed_tests += 1
progress = (self.completed_tests / self.total_tests) * 100
print(f"Progress: {progress:.1f}% ({self.completed_tests}/{self.total_tests})")
Metrics Collection¤
class MetricsObserver(Observer):
"""Collect performance and execution metrics"""
def on_test_case_started(self, event):
self.start_time = time.time()
def on_test_case_finished(self, event):
duration = time.time() - self.start_time
self.metrics.append({
'test_name': event.test_name,
'duration': duration,
'success': event.success
})
Extending the Observer System¤
Custom Event Types¤
Define events for domain-specific needs:
@dataclass
class CustomProtocolEvent:
"""Custom event for protocol-specific notifications"""
protocol: str
message_type: str
payload: dict
timestamp: datetime
Asynchronous Observers¤
For I/O intensive operations:
import asyncio
class AsyncObserver(Observer):
"""Observer with asynchronous event processing"""
async def on_event_async(self, event):
"""Process events asynchronously"""
await self.perform_io_operation(event)
def on_event(self, event):
"""Synchronous wrapper that schedules async processing"""
asyncio.create_task(self.on_event_async(event))
Debugging Observer Issues¤
- Enable event logging to trace event flow
- Check observer registration timing
- Verify event type matching (wildcards vs. specific types)
- Monitor for observer exceptions that could break event chains
Related Documentation:
- Experiment Engine — How events coordinate experiment execution
- Plugin Development — Integrating events in plugins