Source code for magpie.task.worker

import os
import signal
import sys
from threading import Thread

from duct import cmd
from loguru import logger

from magpie import config
from magpie.util import children_pids

worker_pid = None
"""PID of the worker cluster process"""


[docs] def start_worker(): """Start a Celery worker cluster as a subprocess of this one.""" global worker_pid handle = (cmd('uv', 'run', 'celery', '-A', 'magpie.task.queue', 'worker', '-E', '-c', str(config.WORKER_COUNT), '--loglevel=INFO') .unchecked() # we do not want the monitoring thread to fail if the process is killed .start()) worker_pid = handle.pids()[0] logger.debug(f'Started worker process with pid={worker_pid}') # wait for worker to finish in a background thread Thread(target=lambda: handle.wait()).start()
[docs] def stop_worker(now=False): """Stop a running Celery worker cluster. Args: now: whether to perform a cold shutdown (kill all the workers instantly) or a warm shutdown (wait until all the workers exit) """ global worker_pid if worker_pid is None: return if now: # we need to find the children of the worker process and send the SIGKILL signal # to all of them, otherwise they will end up zombies if we just kill the worker process # on Linux we just need to send it to the worker process, see note here: # https://docs.celeryq.dev/en/stable/userguide/workers.html#stopping-the-worker # find the pid of the actual worker process, which is the child of the `uv` one try: actual_worker_pid = children_pids(worker_pid)[0] except IndexError: # children pids list was empty -> we have no children os.kill(worker_pid, signal.SIGKILL) return if not sys.platform.startswith('linux'): children_pid = children_pids(actual_worker_pid) os.kill(actual_worker_pid, signal.SIGKILL) if not sys.platform.startswith('linux'): for pid in children_pid: os.kill(pid, signal.SIGKILL) else: # SIGTERM is properly forwarded to children os.kill(worker_pid, signal.SIGTERM) worker_pid = None