Skip to content

Observer Module API Reference¤

Core Interfaces¤

IObserver¤

class IObserver(ABC)

Core interface for all observer implementations in PANTHER.

Description: Defines the contract for event processing with deduplication protection and interest-based filtering.

Constructor: IObserver() - initializes processed_events_uuids list.

Attributes: - processed_events_uuids: List[str] - UUIDs of previously processed events

Methods:

on_event(event: BaseEvent) -> None (abstract)¤

Handle an incoming event. Must be implemented by subclasses.

is_interested(event_type: str) -> bool¤

Check if this observer should process events of the given type. Default returns True (interested in all events).

get_priority() -> int¤

Get the priority for this observer. Higher values mean higher priority. Default returns 0.

_setup_logging(logger_name, log_level, enable_colors=True, output_file=None, structured_output=False)¤

Set up logging for an observer using LoggerFactory. Supports lazy file handler creation (files are only created when content is first written).


ITypedObserver¤

class ITypedObserver(IObserver)

Enhanced observer interface with typed event handlers for automatic event routing.

Description: Extends IObserver to provide specific on_* handler methods for each event type. The on_event() method automatically routes events to their typed handler based on type(event). Observers can override only the handlers they need.

Constructor: ITypedObserver() - initializes _event_handlers dict mapping event classes to handler methods.

Methods:

on_event(event: BaseEvent)¤

Main event handler that routes to specific typed handlers by looking up type(event) in _event_handlers. Falls back to on_unknown_event() for unregistered event types. Catches exceptions per handler to prevent cascading failures.

on_unknown_event(event: BaseEvent) -> bool¤

Handle unknown event types. Default logs a debug warning and returns True.

is_interested(event_type: str) -> bool¤

Checks if any handler class name contains the event_type string (partial string matching). Returns True if a match is found.

Typed Handler Methods (all return bool, default True):

Experiment: on_experiment_initialized, on_experiment_plugin_loading_started, on_experiment_plugin_loading_completed, on_experiment_plugin_loading_failed, on_experiment_test_cases_initialized, on_experiment_execution_started, on_experiment_execution_completed, on_experiment_execution_failed, on_experiment_finished_early, on_experiment_completed, on_experiment_failed

Test: on_test_created, on_test_setup_started, on_test_setup_completed, on_test_setup_failed, on_test_environment_setup_started, on_test_environment_setup_completed, on_test_environment_setup_failed, on_test_deployment_started, on_test_deployment_completed, on_test_deployment_failed, on_test_execution_started, on_test_step_started, on_test_step_completed, on_test_step_failed, on_test_assertions_started, on_test_assertion_checked, on_test_assertions_completed, on_test_assertions_failed, on_test_execution_completed, on_test_execution_failed, on_test_teardown_started, on_test_teardown_completed, on_test_completed, on_test_failed

Service: on_service_created, on_service_preparation_started, on_service_preparation_completed, on_service_preparation_failed, on_service_deployment_started, on_service_deployment_completed, on_service_deployment_failed, on_service_started, on_service_ready, on_service_health_check_passed, on_service_health_check_failed, on_service_stopped, on_service_error, on_service_destroyed, on_service_test_results, on_command_generation_started, on_command_generated, on_docker_build_started, on_docker_build_completed, on_tester_analysis_started, on_tester_analysis_completed

Environment: on_environment_created, on_environment_setup_started, on_environment_setup_completed, on_environment_setup_failed, on_environment_teardown_started, on_environment_teardown_completed, on_environment_error, on_network_setup_started, on_network_setup_completed, on_network_setup_failed, on_network_teardown_started, on_network_teardown_completed, on_execution_environment_setup_started, on_execution_environment_setup_completed, on_execution_environment_resource_monitoring, on_execution_environment_limit_exceeded

Step: on_step_execution_started, on_step_execution_completed, on_step_execution_failed, on_step_progress, on_step_unsupported, on_step_skipped

Assertion: on_assertions_validation_started, on_assertions_validation_completed, on_assertion_progress, on_assertion_result, on_assertion_error, on_assertion_unknown

Metrics: on_metric_collected, on_resource_metric, on_timing_metric, on_counter_metric, on_metrics_summary

Plugin: on_plugin_loading_started, on_plugin_loading_completed, on_plugin_loading_failed, on_plugin_initialized, on_plugin_started, on_plugin_stopped, on_plugin_error, on_plugin_service_created, on_plugin_service_started, on_plugin_service_stopped


IPluginObserver¤

class IPluginObserver(IObserver)

Interface for plugin-based observers with bidirectional event-plugin mapping.

Constructor: IPluginObserver() - initializes plugin_events and event_plugins defaultdicts.

Attributes: - plugin_events: Dict[str, Set[str]] - Maps plugin IDs to their interested event types - event_plugins: Dict[str, Set[str]] - Maps event types to interested plugin IDs

Abstract Methods:

register_plugin_events(plugin_id: str, event_types: List[str]) -> None¤

Register event types that a plugin is interested in.

unregister_plugin(plugin_id: str) -> None¤

Unregister a plugin and its event interests.

get_plugin_events(plugin_id: str) -> List[str]¤

Get the event types a plugin is interested in.

get_plugins_for_event(event_type: str) -> List[str]¤

Get plugins interested in a specific event type.

PluginObserver (concrete implementation)¤

class PluginObserver(IPluginObserver)

Concrete implementation of plugin-based observer. Implements all abstract methods.

Additional Methods:

on_event(event: Event) -> None¤

Handle an event by notifying interested plugins via _notify_plugins().

_notify_plugins(event: Event, plugin_ids: List[str]) -> None¤

Notify specific plugins about an event. Default implementation logs notifications.

get_registered_plugins() -> List[str]¤

Get a list of all registered plugin IDs.

get_event_types() -> List[str]¤

Get a list of all event types that have interested plugins.


Management Classes¤

EventManager¤

class EventManager(LoggerMixin)

Central event coordination system. Implements singleton pattern.

Constructor: EventManager() - initializes observer registry, event history, metrics, and deduplication systems. Guarded by _initialized flag to prevent re-initialization.

Attributes: - observers: Dict[str, List[Tuple[int, IObserver]]] - Map of event types to prioritized observers - global_observers: List[Tuple[int, IObserver]] - Global observers that receive all events - event_history: List[Tuple[datetime, BaseEvent]] - Recent events for debugging (max 1000) - metrics: Dict - Processing metrics (processed, errors, by_type) - duplicate_detection_enabled: bool - Content-based duplicate detection flag (default True)

Class Methods:

get_instance() -> EventManager¤

Get the singleton instance.

ensure_instance() -> EventManager¤

Alias for get_instance() with a clearer name.

reset_instance() -> None¤

Reset the singleton instance (for testing).

Public Methods:

register_observer(observer: IObserver, event_types: List[str] = None, priority: int = 0) -> None¤

Register an observer. If event_types is None, registers as a global observer. Higher priority (larger number) observers are notified first. Skips duplicate registrations.

register_observer_once(observer, observer_id, scope="global", event_types=None, priority=0)¤

Register an observer only if not already registered, with scope tracking. Returns the observer (existing or new).

unregister_observer(observer: IObserver, event_types: List[str] = None) -> None¤

Unregister an observer from specific or all event types.

notify(event: BaseEvent) -> bool¤

Main event distribution method. Performs content-based and time-based duplicate detection, validates the event, records it in history, and notifies matching observers. Returns True if event was processed.

get_event_history(event_type: str = None, limit: int = None) -> List[Tuple[datetime, BaseEvent]]¤

Get recent events, optionally filtered by type.

get_metrics() -> Dict[str, Any]¤

Get event processing metrics.

cleanup_scoped_observers(scope: str) -> None¤

Remove all observers from a specific scope.

get_scoped_observer_count(scope: str = None) -> Dict[str, int]¤

Get count of observers by scope.

has_observer(observer_id: str) -> bool¤

Check if an observer with the given ID is already registered.

get_registered_observer(observer_id: str) -> Optional[Tuple[IObserver, str]]¤

Get a registered observer by its ID. Returns (observer, scope) or None.

get_observer_by_type(observer_type) -> Optional[IObserver]¤

Find and return an observer by its class type. Searches global and event-specific observers.

enable_duplicate_detection(enabled: bool = True) -> None¤

Enable or disable content-based duplicate detection.

clear_signature_cache() -> None¤

Clear the content signature cache for duplicate detection.

set_event_context(context_id: str, context_data: Dict[str, Any]) -> None¤

Set context data for event correlation.

clear_event_context(context_id: str) -> None¤

Clear context data.

get_event_context(context_id: str) -> Dict[str, Any]¤

Get context data for event correlation.

cleanup_none_observers() -> int¤

Remove any None observers from all observer lists. Returns count of cleaned observers.

Module-Level Convenience Function:

def get_event_manager() -> EventManager

Get the EventManager singleton instance.


ResultsManager¤

class ResultsManager(IObserver)

Comprehensive manager for test results. Combines result collection, aggregation, and export.

Constructor: ResultsManager(output_dir: str = None) - creates output directory if needed.

Attributes: - aggregator: ResultAggregator - Result aggregation engine - exporter: ResultsExporter - Export functionality

Methods:

on_event(event: Event) -> None¤

Handle events, specifically looking for TestResultEvent and EnhancedResultEvent types.

is_interested(event_type: str) -> bool¤

Returns True for test.result, test.case.result, test.suite.result, enhanced.result types and prefixes.

get_priority() -> int¤

Returns 10 for early result processing.

register_callback(event_type: str, callback: Callable) -> None¤

Register a callback for a specific result event type. Use "*" for wildcard.

export_results(format_type: str, filename: str = None) -> str¤

Export collected results to a file. Supports "json", "csv", "html", "md". Returns path or empty string.

export_all_formats(basename: str = None) -> Dict[str, str]¤

Export results to all supported formats. Returns map of format types to file paths.

clear_results() -> None¤

Clear all collected results.

get_summary() -> Dict[str, Any]¤

Get result summary: total_tests, successful, failed, success_rate, start_time, end_time, duration, tests.

get_all_results() -> List[Dict[str, Any]]¤

Get all collected test results.

get_results_by_category(category: str) -> List[Dict[str, Any]]¤

Get results filtered by category.

get_results_by_tag(tag: str) -> List[Dict[str, Any]]¤

Get results that contain a specific tag.

get_category_stats() -> Dict[str, Dict[str, int]]¤

Get statistics by category ({category: {"success": n, "failure": n}}).

get_tag_stats() -> Dict[str, Dict[str, int]]¤

Get statistics by tag ({tag: {"success": n, "failure": n}}).

ResultAggregator¤

class ResultAggregator

Aggregates test results from multiple test runs with thread safety.

Methods: add_result(result), get_summary(), get_results_by_test(test_name), get_all_results(), clear().

ResultsExporter¤

class ResultsExporter

Exports test results in various formats.

Supported Formats: json, csv, html, md

Methods: export(format_type, output_path), export_to_json(output_path, pretty=True), export_to_csv(output_path), export_to_html(output_path), export_to_markdown(output_path).


Factory System¤

ObserverFactory¤

class ObserverFactory

Factory for creating and configuring observer instances.

Constructor:

ObserverFactory(
    event_manager: Optional[EventManager] = None,
    observer_config: Optional[BaseObserverConfig] = None,
)

Registers default observer types: "logger" (LoggerObserver), "event_logger" (LoggerObserver), "metrics" (MetricsObserver), "storage" (StorageObserver), "experiment" (ExperimentObserver).

Methods:

create_observer(observer_type, name=None, auto_register=False, event_types=None, priority=0, **kwargs) -> IObserver¤

Create an observer instance of the specified type. Raises ValueError for unknown types. Optionally auto-registers with the event manager.

register_observer_type(name: str, observer_class: type[IObserver]) -> None¤

Register a custom observer type.

register_observer(name: str, observer: IObserver) -> None¤

Register an existing observer instance with a name.

unregister_observer(name: str) -> bool¤

Unregister a named observer. Returns True if found and removed.

get_observer(name: str) -> Optional[IObserver]¤

Get a registered observer by name.

get_all_observers() -> Dict[str, IObserver]¤

Get all registered observers.

get_available_types() -> List[str]¤

Get list of available observer types.

configure_observer_type(observer_type: str, config: Dict[str, Any]) -> None¤

Configure default parameters for an observer type.

set_event_manager(event_manager: EventManager) -> None¤

Set the event manager for this factory.

register_with_event_manager(observer, event_types=None, priority=0) -> None¤

Register an observer with the event manager. Raises RuntimeError if no event manager set.

unregister_from_event_manager(observer: IObserver) -> None¤

Unregister an observer from the event manager. Raises RuntimeError if no event manager set.

set_observer_config(observer_config: BaseObserverConfig) -> None¤

Set the observer configuration for this factory.

batch_register_with_event_manager(observers: list[tuple]) -> None¤

Register multiple observers with the event manager. Tuples contain (observer, event_types, priority).

Module-Level Convenience Functions:

def get_observer_factory(global_config=None) -> ObserverFactory
def create_observer(observer_type: str, **kwargs) -> IObserver
def create_default_observers(config: Dict[str, Any]) -> List[IObserver]

Workflow System¤

WorkflowStateTracker¤

class WorkflowStateTracker(LoggerMixin)

Lightweight tracker for experiment workflow states with validated transitions.

Constructor: WorkflowStateTracker() - initializes empty state and history tracking.

Methods:

set_workflow_state(experiment_id: str, state: WorkflowState) -> bool¤

Set the workflow state for an experiment. Validates transitions against WORKFLOW_TRANSITIONS. New experiments must start with CREATED. Returns True if successful.

get_workflow_state(experiment_id: str) -> Optional[WorkflowState]¤

Get the current workflow state for an experiment.

force_fail_workflow(experiment_id: str, reason: str = "Forced failure") -> bool¤

Force a workflow to FAILED state regardless of current state. Used for error recovery.

clear_workflow_state(experiment_id: str) -> None¤

Clear the state for a specific workflow.

is_workflow_in_terminal_state(experiment_id: str) -> bool¤

Check if a workflow is in a terminal state (COMPLETED or FAILED).

get_all_workflow_states() -> Dict[str, str]¤

Get all current workflow states as {experiment_id: state_value}.

get_allowed_transitions(current_state_str: str) -> List[str]¤

Get list of allowed state transitions from the given state string.

get_state_history(experiment_id: str = None) -> List[dict]¤

Get state transition history, optionally filtered by experiment ID.

WorkflowState Enum¤

class WorkflowState(Enum):
    CREATED = "created"
    LOADING_PLUGINS = "loading_plugins"
    GENERATING_COMMANDS = "generating_commands"
    BUILDING_DOCKER = "building_docker"
    DEPLOYING = "deploying"
    RUNNING = "running"
    COLLECTING_OUTPUTS = "collecting_outputs"
    ANALYZING_RESULTS = "analyzing_results"
    REPORTING_RESULTS = "reporting_results"
    COMPLETED = "completed"
    FAILED = "failed"

Plugin System¤

EventObserverPlugin¤

Base class for plugin-based observer implementations. See source for details.

PluginObserverFactory¤

Factory for dynamic plugin observer loading and management. See source for details.


Utility Classes¤

EventColors¤

Color mapping utilities for event display in terminal output.