Source code for magpie.task.flower

import os
import signal
from threading import Thread

from duct import cmd
from loguru import logger

from magpie.util import children_pids

flower_pid = None
"""PID of the Flower process"""


[docs] def start_flower(): """Start a [Flower](https://flower.readthedocs.io/) process to monitor Celery workers as a subprocess of this one.""" global flower_pid handle = (cmd('uv', 'run', 'celery', '-A', 'magpie.task.queue', 'flower', '--loglevel=INFO') .unchecked() # we do not want the monitoring thread to fail if the process is killed .start()) flower_pid = handle.pids()[0] logger.debug(f'Started flower process with pid={flower_pid}') logger.debug('Access it at: http://localhost:5555/') # wait for flower to finish in a background thread Thread(target=lambda: handle.wait()).start()
[docs] def stop_flower(now=False): """Stop a running [Flower](https://flower.readthedocs.io/) process. Args: now: whether to perform a cold shutdown (kill the process instantly) or a warm shutdown (wait until all the process exits) """ global flower_pid if flower_pid is None: return if now: # find the pid of the actual flower process, which is the child of the `uv` one try: actual_flower_pid = children_pids(flower_pid)[0] except IndexError: # children pids list was empty -> we have no children actual_flower_pid = flower_pid os.kill(actual_flower_pid, signal.SIGKILL) else: # SIGTERM is properly forwarded to children os.kill(flower_pid, signal.SIGTERM) flower_pid = None