Observer System Quickstart¤
This tutorial demonstrates how to create and use observers to monitor events in PANTHER's testing framework.
Basic Observer Creation¤
Step 1: Create Your First Observer¤
from panther.core.observer.base.observer_interface import IObserver
from panther.core.events.base.event_base import BaseEvent
class MyFirstObserver(IObserver):
"""Simple observer that logs events to console."""
def __init__(self):
super().__init__()
self.event_count = 0
def is_interested(self, event_type: str) -> bool:
"""Only process test-related events."""
return event_type.startswith("test.")
def on_event(self, event: BaseEvent):
"""Process incoming events."""
# Check for duplicates
if event.uuid in self.processed_events_uuids:
return
# Process the event
print(f"📅 {event.timestamp}")
print(f"🎯 Event: {event.event_type}")
print(f"🏷️ Entity: {event.entity_type}:{event.entity_id}")
print(f"🆔 UUID: {event.uuid}")
print("---")
# Mark as processed
self.processed_events_uuids.append(event.uuid)
self.event_count += 1
# Test the observer
observer = MyFirstObserver()
# Simulate an event (normally events come from emitters)
from panther.core.events.test.events import TestStartedEvent
from datetime import datetime
event = TestStartedEvent(
entity_id="test-001",
test_name="Connection Test",
start_time=datetime.now()
)
# Check interest and process
if observer.is_interested(event.event_type.value):
observer.on_event(event)
print(f"Total events processed: {observer.event_count}")
Step 2: Create a Specialized Observer¤
from typing import Dict, Any
import json
class TestMetricsObserver(IObserver):
"""Observer that collects test performance metrics."""
def __init__(self):
super().__init__()
self.test_metrics = {}
self.test_start_times = {}
def is_interested(self, event_type: str) -> bool:
"""Track test start and completion events."""
return event_type in [
"test.started",
"test.completed",
"test.failed"
]
def on_event(self, event: BaseEvent):
"""Collect test metrics from events."""
if event.uuid in self.processed_events_uuids:
return
entity_id = event.entity_id
if event.event_type.value == "test.started":
self.test_start_times[entity_id] = event.timestamp
self.test_metrics[entity_id] = {
"name": getattr(event, 'test_name', 'Unknown'),
"status": "running",
"start_time": event.timestamp.isoformat()
}
elif event.event_type.value in ["test.completed", "test.failed"]:
if entity_id in self.test_start_times:
duration = (event.timestamp - self.test_start_times[entity_id]).total_seconds()
self.test_metrics[entity_id].update({
"status": "completed" if event.event_type.value == "test.completed" else "failed",
"end_time": event.timestamp.isoformat(),
"duration_seconds": duration
})
if hasattr(event, 'error_message'):
self.test_metrics[entity_id]["error"] = event.error_message
self.processed_events_uuids.append(event.uuid)
def get_metrics_summary(self) -> Dict[str, Any]:
"""Get summary of collected test metrics."""
total_tests = len(self.test_metrics)
completed = sum(1 for m in self.test_metrics.values() if m["status"] == "completed")
failed = sum(1 for m in self.test_metrics.values() if m["status"] == "failed")
running = sum(1 for m in self.test_metrics.values() if m["status"] == "running")
durations = [m.get("duration_seconds", 0) for m in self.test_metrics.values()
if "duration_seconds" in m]
avg_duration = sum(durations) / len(durations) if durations else 0
return {
"total_tests": total_tests,
"completed": completed,
"failed": failed,
"running": running,
"average_duration": avg_duration,
"tests": self.test_metrics
}
def export_metrics(self, filename: str):
"""Export metrics to JSON file."""
with open(filename, 'w') as f:
json.dump(self.get_metrics_summary(), f, indent=2)
# Test the metrics observer
metrics_observer = TestMetricsObserver()
# Simulate test lifecycle
test_events = [
TestStartedEvent(entity_id="test-001", test_name="HTTP Test", start_time=datetime.now()),
TestCompletedEvent(entity_id="test-001", test_name="HTTP Test",
end_time=datetime.now(), duration=2.5),
TestStartedEvent(entity_id="test-002", test_name="HTTPS Test", start_time=datetime.now()),
TestFailedEvent(entity_id="test-002", test_name="HTTPS Test",
failure_time=datetime.now(), error_message="Connection timeout")
]
for event in test_events:
if metrics_observer.is_interested(event.event_type.value):
metrics_observer.on_event(event)
# View metrics
summary = metrics_observer.get_metrics_summary()
print(json.dumps(summary, indent=2))
Using the Observer Factory¤
Step 3: Factory-Based Observer Creation¤
from panther.core.observer.factory.observer_factory import get_observer_factory
# Get the global factory instance
factory = get_observer_factory()
# Create built-in observers with configuration
logger_observer = factory.create_observer("logger", {
"level": "INFO",
"colored": True,
"format": "detailed"
})
metrics_observer = factory.create_observer("metrics", {
"interval": 1.0,
"metrics": ["cpu", "memory"],
"storage": "memory"
})
storage_observer = factory.create_observer("storage", {
"backend": "file",
"file_path": "test_events.jsonl",
"compression": True
})
print("Created observers:")
print(f"- Logger: {type(logger_observer).__name__}")
print(f"- Metrics: {type(metrics_observer).__name__}")
print(f"- Storage: {type(storage_observer).__name__}")
Step 4: Register Custom Observer with Factory¤
# Register our custom observer type
factory.register_observer_type(
name="test_metrics",
class_type=TestMetricsObserver,
config_schema={
"export_file": str,
"auto_export": bool
}
)
# Now create via factory
custom_observer = factory.create_observer("test_metrics", {
"export_file": "custom_metrics.json",
"auto_export": True
})
print(f"Created custom observer: {type(custom_observer).__name__}")
Working with Event Manager¤
Step 5: Complete Observer Integration¤
from panther.core.observer.management.event_manager import EventManager
# Create event manager
event_manager = EventManager()
# Create and register observers
observers = [
factory.create_observer("logger", {"level": "INFO"}),
TestMetricsObserver(),
factory.create_observer("storage", {"backend": "file", "file_path": "events.log"})
]
for observer in observers:
event_manager.register_observer(observer)
print(f"Registered {len(observers)} observers")
# Emit events through manager (normally done by emitters)
test_events = [
TestStartedEvent(entity_id="integration-test", test_name="Full Integration",
start_time=datetime.now()),
]
for event in test_events:
event_manager.emit_event(event)
print("Events emitted to all interested observers")
# Get observer processing stats
registered = event_manager.get_registered_observers()
print(f"Total registered observers: {len(registered)}")
Advanced Observer Patterns¤
Step 6: Filtered Observer¤
class FilteredTestObserver(IObserver):
"""Observer with advanced filtering capabilities."""
def __init__(self, entity_filter=None, test_name_pattern=None):
super().__init__()
self.entity_filter = entity_filter or []
self.test_name_pattern = test_name_pattern
self.filtered_events = []
def is_interested(self, event_type: str) -> bool:
"""Filter by event type."""
return event_type.startswith("test.")
def on_event(self, event: BaseEvent):
"""Apply additional filtering logic."""
if event.uuid in self.processed_events_uuids:
return
# Entity filtering
if self.entity_filter and event.entity_id not in self.entity_filter:
return
# Test name pattern filtering
if self.test_name_pattern and hasattr(event, 'test_name'):
if self.test_name_pattern.lower() not in event.test_name.lower():
return
# Process filtered event
self.filtered_events.append({
"event_type": event.event_type.value,
"entity_id": event.entity_id,
"timestamp": event.timestamp.isoformat(),
"test_name": getattr(event, 'test_name', 'N/A')
})
self.processed_events_uuids.append(event.uuid)
def get_filtered_events(self):
"""Get events that passed filters."""
return self.filtered_events
# Use filtered observer
filtered_observer = FilteredTestObserver(
entity_filter=["critical-test-001", "critical-test-002"],
test_name_pattern="critical"
)
event_manager.register_observer(filtered_observer)
Step 7: Async Observer (Advanced)¤
import asyncio
from typing import List
class AsyncLoggerObserver(IObserver):
"""Observer that processes events asynchronously."""
def __init__(self):
super().__init__()
self.event_queue = asyncio.Queue()
self.processing_task = None
self.processed_count = 0
async def start_processing(self):
"""Start async event processing loop."""
self.processing_task = asyncio.create_task(self._process_events())
async def stop_processing(self):
"""Stop async processing gracefully."""
if self.processing_task:
self.processing_task.cancel()
try:
await self.processing_task
except asyncio.CancelledError:
pass
def is_interested(self, event_type: str) -> bool:
return True # Process all events
def on_event(self, event: BaseEvent):
"""Queue event for async processing."""
if event.uuid not in self.processed_events_uuids:
# Non-blocking queue operation
try:
self.event_queue.put_nowait(event)
except asyncio.QueueFull:
print(f"Warning: Event queue full, dropping event {event.uuid}")
async def _process_events(self):
"""Async event processing loop."""
while True:
try:
event = await self.event_queue.get()
await self._process_event_async(event)
self.processed_events_uuids.append(event.uuid)
self.processed_count += 1
self.event_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
print(f"Error processing event: {e}")
async def _process_event_async(self, event: BaseEvent):
"""Async event processing - simulate I/O work."""
# Simulate async logging to external service
await asyncio.sleep(0.1)
print(f"[ASYNC] Processed {event.event_type} for {event.entity_id}")
# Use async observer (in async context)
async def demo_async_observer():
async_observer = AsyncLoggerObserver()
await async_observer.start_processing()
try:
# Process some events
events = [
TestStartedEvent(entity_id=f"async-test-{i}",
test_name=f"Async Test {i}",
start_time=datetime.now())
for i in range(5)
]
for event in events:
async_observer.on_event(event)
# Wait for processing to complete
await async_observer.event_queue.join()
print(f"Processed {async_observer.processed_count} events asynchronously")
finally:
await async_observer.stop_processing()
# Run async demo (uncomment to test)
# asyncio.run(demo_async_observer())
Observer Configuration with YAML¤
Step 8: Configuration-Driven Observers¤
# Create config file: observer_config.yaml
yaml_config = """
observers:
- type: logger
enabled: true
config:
level: DEBUG
colored: true
format: detailed
- type: test_metrics
enabled: true
config:
export_file: test_results.json
auto_export: true
- type: storage
enabled: true
config:
backend: file
file_path: panther_events.jsonl
compression: true
retention_days: 7
"""
# Save config to file
with open("observer_config.yaml", "w") as f:
f.write(yaml_config)
# Load observers from config
from panther.core.observer.factory.factory_config import load_observer_config
observers = load_observer_config("observer_config.yaml")
print(f"Loaded {len(observers)} observers from config")
# Register all loaded observers
for observer in observers:
event_manager.register_observer(observer)
Testing Your Observers¤
Step 9: Observer Testing Patterns¤
import unittest
from unittest.mock import Mock
class TestMyObserver(unittest.TestCase):
"""Test suite for custom observers."""
def setUp(self):
self.observer = TestMetricsObserver()
def test_interest_filtering(self):
"""Test event type interest filtering."""
self.assertTrue(self.observer.is_interested("test.started"))
self.assertTrue(self.observer.is_interested("test.completed"))
self.assertFalse(self.observer.is_interested("service.started"))
def test_event_processing(self):
"""Test basic event processing."""
event = TestStartedEvent(
entity_id="test-123",
test_name="Unit Test",
start_time=datetime.now()
)
self.observer.on_event(event)
self.assertIn("test-123", self.observer.test_metrics)
self.assertEqual(self.observer.test_metrics["test-123"]["status"], "running")
def test_duplicate_prevention(self):
"""Test duplicate event prevention."""
event = TestStartedEvent(
entity_id="test-123",
test_name="Duplicate Test",
start_time=datetime.now()
)
# Process same event twice
self.observer.on_event(event)
initial_count = len(self.observer.test_metrics)
self.observer.on_event(event) # Duplicate
final_count = len(self.observer.test_metrics)
self.assertEqual(initial_count, final_count)
def test_metrics_calculation(self):
"""Test metrics calculation accuracy."""
start_time = datetime.now()
end_time = start_time + timedelta(seconds=5)
start_event = TestStartedEvent(
entity_id="test-duration",
test_name="Duration Test",
start_time=start_time
)
complete_event = TestCompletedEvent(
entity_id="test-duration",
test_name="Duration Test",
end_time=end_time,
duration=5.0
)
self.observer.on_event(start_event)
self.observer.on_event(complete_event)
summary = self.observer.get_metrics_summary()
self.assertEqual(summary["total_tests"], 1)
self.assertEqual(summary["completed"], 1)
self.assertAlmostEqual(summary["average_duration"], 5.0, places=1)
# Run tests
if __name__ == "__main__":
unittest.main()
Best Practices¤
Observer Design Guidelines¤
- Interest Filtering: Always implement
is_interested()efficiently - Duplicate Handling: Check
processed_events_uuidsinon_event() - Error Handling: Wrap processing logic in try-catch blocks
- Resource Management: Clean up resources in destructor/cleanup methods
- Thread Safety: Use locks for shared state in multi-threaded environments
Performance Tips¤
- Batch Processing: Process multiple events together when possible
- Async I/O: Use async observers for I/O-bound operations
- Memory Management: Limit size of
processed_events_uuidslist - Selective Filtering: Filter events as early as possible
- Lazy Evaluation: Defer expensive calculations until needed
Testing Strategies¤
- Unit Tests: Test individual observer methods
- Integration Tests: Test with real event manager
- Performance Tests: Measure processing throughput
- Error Tests: Verify graceful error handling
- Configuration Tests: Test factory creation with various configs
This tutorial covers the essential patterns for creating and using observers in PANTHER. See the API reference for complete details on all observer types and advanced configuration options.