Source code for magpie.task.monitor

import atexit
import time
from threading import Event, Thread

from celery.events import EventReceiver
from loguru import logger

from magpie.util import short_str

# TODO: investigate if using `celery.events.state.State` instead of our own dict
#       might be beneficial
pending_tasks = {}  # dict of {task_id: callback}
"""Dict of tasks that have been submitted with a callback to be executed with the
result when the corresponding task finishes."""

all_tasks_complete = Event()
"""`threading.Event` that signals when all tasks have been completed."""
all_tasks_complete.set()

monitoring_thread = None
"""Global monitoring thread object."""


[docs] def start_task_monitoring_thread(): """Start a monitoring thread that will inspect the celery workers and notify back the app when tasks have finished processing, along with their results. """ global monitoring_thread if monitoring_thread is None: logger.debug('Starting task monitoring thread') monitoring_thread = EventsMonitor() monitoring_thread.start() else: logger.warning('Trying to start task monitoring thread but one is already running')
[docs] def add_task(task_id, callback): pending_tasks[task_id] = callback all_tasks_complete.clear()
[docs] def wait_for_tasks_completion(timeout=None): """Blocking wait for tasks to complete.""" all_tasks_complete.wait(timeout=timeout)
[docs] def stop_task_monitoring_thread(): """Stop the task monitoring thread.""" logger.debug('Stopping task monitoring thread') if monitoring_thread is not None: monitoring_thread.recv.should_stop = True
# wait some time before exiting in case we still have tasks being processed atexit.register(lambda: wait_for_tasks_completion(timeout=3)) # see reference for event types: # https://docs.celeryq.dev/en/stable/userguide/monitoring.html#event-reference
[docs] class EventsMonitor(Thread): """Events monitoring thread that captures events from the Celery worker cluster and acts upon those, in particular this will call the registered callbacks to be executed on tasks completion. """ def __init__(self): Thread.__init__(self) # we exit properly when calling `stop_task_monitoring_thread` but let's # have this thread as a daemon anyway. self.daemon = True # monitor events from our main app import magpie.task.queue self.capp = magpie.task.queue.app # enable events on the workers if not done already self.capp.control.enable_events()
[docs] def run(self): handlers = { 'task-succeeded': self.on_success, 'task-failed': self.on_error, '*': self.trace_other_events, } try_interval = 1 while True: try: try_interval *= 2 with self.capp.connection() as conn: self.recv = EventReceiver(conn, handlers=handlers, app=self.capp) try_interval = 1 logger.debug('Capturing Celery events...') self.recv.capture(limit=None, timeout=None, wakeup=True) except (KeyboardInterrupt, SystemExit): # if we catch a KeyboardInterrupt in a background, we want to # "forward" it to the main thread import _thread as thread thread.interrupt_main() except Exception as e: logger.error(f"Failed to capture Celery events: '{e}', " f"trying again in {try_interval} seconds.") logger.exception(e) time.sleep(try_interval) if self.recv.should_stop: return
[docs] def on_success(self, event): task_id = event['uuid'] r = self.capp.AsyncResult(task_id) logger.debug(f'Task {task_id} succeeded with result: {short_str(r.result)}') callback = pending_tasks.pop(task_id, None) if callback is not None: try: callback(r.result) except Exception as e: logger.error(f'Error while executing callback for task {task_id}: {e}') logger.exception(e) else: # task succeeded with no registered callback -> do nothing pass logger.debug(f'Remaining tasks: {len(pending_tasks)}') if not pending_tasks: all_tasks_complete.set()
[docs] def on_error(self, event): task_id = event['uuid'] logger.debug(f'Task {task_id} failed and raised: {event["exception"]}') pending_tasks.pop(task_id, None) if not pending_tasks: all_tasks_complete.set()
[docs] def trace_other_events(self, event): type = event.get('type') if type in ('task-received', 'task-started', 'worker-heartbeat'): return if type == 'worker-online': # if a worker just came online, ensure that it will be sending us events self.capp.control.enable_events() if type in ('worker-online', 'worker-offline'): logger.info(f'Celery worker \'{event.get("hostname")}\' is now {type[7:]}') return logger.warning(f'not handling event type {event.get("type")}: {event}')