Source code for magpie.task.worker
import os
import signal
import sys
from threading import Thread
from duct import cmd
from loguru import logger
from magpie import config
from magpie.util import children_pids
worker_pid = None
"""PID of the worker cluster process"""
[docs]
def start_worker():
"""Start a Celery worker cluster as a subprocess of this one."""
global worker_pid
handle = (cmd('uv', 'run', 'celery', '-A', 'magpie.task.queue', 'worker', '-E',
'-c', str(config.WORKER_COUNT), '--loglevel=INFO')
.unchecked() # we do not want the monitoring thread to fail if the process is killed
.start())
worker_pid = handle.pids()[0]
logger.debug(f'Started worker process with pid={worker_pid}')
# wait for worker to finish in a background thread
Thread(target=lambda: handle.wait()).start()
[docs]
def stop_worker(now=False):
"""Stop a running Celery worker cluster.
Args:
now: whether to perform a cold shutdown (kill all the workers instantly) or a warm shutdown
(wait until all the workers exit)
"""
global worker_pid
if worker_pid is None:
return
if now:
# we need to find the children of the worker process and send the SIGKILL signal
# to all of them, otherwise they will end up zombies if we just kill the worker process
# on Linux we just need to send it to the worker process, see note here:
# https://docs.celeryq.dev/en/stable/userguide/workers.html#stopping-the-worker
# find the pid of the actual worker process, which is the child of the `uv` one
try:
actual_worker_pid = children_pids(worker_pid)[0]
except IndexError:
# children pids list was empty -> we have no children
os.kill(worker_pid, signal.SIGKILL)
return
if not sys.platform.startswith('linux'):
children_pid = children_pids(actual_worker_pid)
os.kill(actual_worker_pid, signal.SIGKILL)
if not sys.platform.startswith('linux'):
for pid in children_pid:
os.kill(pid, signal.SIGKILL)
else:
# SIGTERM is properly forwarded to children
os.kill(worker_pid, signal.SIGTERM)
worker_pid = None