Source code for magpie.task.monitor
import atexit
import time
from threading import Event, Thread
from celery.events import EventReceiver
from loguru import logger
from magpie.util import short_str
# TODO: investigate if using `celery.events.state.State` instead of our own dict
# might be beneficial
pending_tasks = {} # dict of {task_id: callback}
"""Dict of tasks that have been submitted with a callback to be executed with the
result when the corresponding task finishes."""
all_tasks_complete = Event()
"""`threading.Event` that signals when all tasks have been completed."""
all_tasks_complete.set()
monitoring_thread = None
"""Global monitoring thread object."""
[docs]
def start_task_monitoring_thread():
"""Start a monitoring thread that will inspect the celery workers and notify
back the app when tasks have finished processing, along with their results.
"""
global monitoring_thread
if monitoring_thread is None:
logger.debug('Starting task monitoring thread')
monitoring_thread = EventsMonitor()
monitoring_thread.start()
else:
logger.warning('Trying to start task monitoring thread but one is already running')
[docs]
def add_task(task_id, callback):
pending_tasks[task_id] = callback
all_tasks_complete.clear()
[docs]
def wait_for_tasks_completion(timeout=None):
"""Blocking wait for tasks to complete."""
all_tasks_complete.wait(timeout=timeout)
[docs]
def stop_task_monitoring_thread():
"""Stop the task monitoring thread."""
logger.debug('Stopping task monitoring thread')
if monitoring_thread is not None:
monitoring_thread.recv.should_stop = True
# wait some time before exiting in case we still have tasks being processed
atexit.register(lambda: wait_for_tasks_completion(timeout=3))
# see reference for event types:
# https://docs.celeryq.dev/en/stable/userguide/monitoring.html#event-reference
[docs]
class EventsMonitor(Thread):
"""Events monitoring thread that captures events from the Celery worker cluster
and acts upon those, in particular this will call the registered callbacks to be
executed on tasks completion.
"""
def __init__(self):
Thread.__init__(self)
# we exit properly when calling `stop_task_monitoring_thread` but let's
# have this thread as a daemon anyway.
self.daemon = True
# monitor events from our main app
import magpie.task.queue
self.capp = magpie.task.queue.app
# enable events on the workers if not done already
self.capp.control.enable_events()
[docs]
def run(self):
handlers = {
'task-succeeded': self.on_success,
'task-failed': self.on_error,
'*': self.trace_other_events,
}
try_interval = 1
while True:
try:
try_interval *= 2
with self.capp.connection() as conn:
self.recv = EventReceiver(conn, handlers=handlers, app=self.capp)
try_interval = 1
logger.debug('Capturing Celery events...')
self.recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
# if we catch a KeyboardInterrupt in a background, we want to
# "forward" it to the main thread
import _thread as thread
thread.interrupt_main()
except Exception as e:
logger.error(f"Failed to capture Celery events: '{e}', "
f"trying again in {try_interval} seconds.")
logger.exception(e)
time.sleep(try_interval)
if self.recv.should_stop:
return
[docs]
def on_success(self, event):
task_id = event['uuid']
r = self.capp.AsyncResult(task_id)
logger.debug(f'Task {task_id} succeeded with result: {short_str(r.result)}')
callback = pending_tasks.pop(task_id, None)
if callback is not None:
try:
callback(r.result)
except Exception as e:
logger.error(f'Error while executing callback for task {task_id}: {e}')
logger.exception(e)
else:
# task succeeded with no registered callback -> do nothing
pass
logger.debug(f'Remaining tasks: {len(pending_tasks)}')
if not pending_tasks:
all_tasks_complete.set()
[docs]
def on_error(self, event):
task_id = event['uuid']
logger.debug(f'Task {task_id} failed and raised: {event["exception"]}')
pending_tasks.pop(task_id, None)
if not pending_tasks:
all_tasks_complete.set()
[docs]
def trace_other_events(self, event):
type = event.get('type')
if type in ('task-received', 'task-started', 'worker-heartbeat'):
return
if type == 'worker-online':
# if a worker just came online, ensure that it will be sending us events
self.capp.control.enable_events()
if type in ('worker-online', 'worker-offline'):
logger.info(f'Celery worker \'{event.get("hostname")}\' is now {type[7:]}')
return
logger.warning(f'not handling event type {event.get("type")}: {event}')