Source code for lisa.notifier

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import copy
import threading
from datetime import datetime
from functools import partial
from typing import Any, Dict, List, Optional, Type

from lisa import schema
from lisa.messages import MessageBase
from lisa.util import InitializableMixin, constants, subclasses
from lisa.util.logger import get_logger
from lisa.util.parallel import run_in_parallel

_get_init_logger = partial(get_logger, "init", "notifier")


[docs] class Notifier(subclasses.BaseClassWithRunbookMixin, InitializableMixin): def __init__(self, runbook: schema.TypedSchema) -> None: super().__init__(runbook=runbook) self._log = get_logger("notifier", self.__class__.__name__)
[docs] def finalize(self) -> None: """ All test done. notifier should release resource, or do finalize work, like save to a file. Even failed, this method will be called. """ pass
[docs] def _subscribed_message_type(self) -> List[Type[MessageBase]]: """ Specify which message types want to be subscribed. Other types won't be passed in. """ raise NotImplementedError("must specify supported message types")
[docs] def _received_message(self, message: MessageBase) -> None: """ Called by notifier, when a subscribed message happens. """ raise NotImplementedError
[docs] def _initialize(self, *args: Any, **kwargs: Any) -> None: """ initialize is optional """ pass
_notifiers: List[Notifier] = [] _messages: Dict[type, List[Notifier]] = {} # prevent concurrent message conflict. _message_queue: List[MessageBase] = [] _message_queue_lock = threading.Lock() _notifying_lock = threading.Lock() _system_notifiers = [constants.NOTIFIER_CONSOLE, constants.NOTIFIER_FILE] def initialize(runbooks: List[schema.Notifier]) -> None: factory = subclasses.Factory[Notifier](Notifier) log = _get_init_logger() # add system notifiers to provide troubleshooting information names = (x.type.lower() for x in runbooks) for system_notifier in _system_notifiers: if system_notifier not in names: runbooks.append(schema.Notifier(type=system_notifier)) for runbook in runbooks: if not runbook.enabled: log.debug(f"skipped notifier [{runbook.type}], because it's not enabled.") continue notifier = factory.create_by_runbook(runbook=runbook) register_notifier(notifier) def register_notifier(notifier: Notifier) -> None: """ register internal notifiers """ notifier.initialize() _notifiers.append(notifier) subscribed_message_types: List[ Type[MessageBase] ] = notifier._subscribed_message_type() for message_type in subscribed_message_types: registered_notifiers = _messages.get(message_type, []) registered_notifiers.append(notifier) _messages[message_type] = registered_notifiers log = _get_init_logger() log.debug( f"registered [{notifier.type_name()}] " f"on messages: {[x.type for x in subscribed_message_types]}" ) def notify(message: MessageBase) -> None: message.time = datetime.utcnow() # to make sure message get order as possible, use a queue to hold messages. with _message_queue_lock: _message_queue.append(message) while len(_message_queue) > 0: # send message one by one with _notifying_lock: with _message_queue_lock: current_message: Optional[MessageBase] = None if len(_message_queue) > 0: current_message = _message_queue.pop() if current_message: message_types = type(current_message).__mro__ for message_type in message_types: notifiers = _messages.get(message_type, []) if notifiers: run_in_parallel( [ partial( x._received_message, message=copy.deepcopy(current_message), ) for x in notifiers ] ) if message_type == MessageBase: # skip the object type break def finalize() -> None: for notifier in _notifiers: try: notifier.finalize() except Exception as identifier: notifier._log.exception(identifier)