Source code for magpie.task.flower
import os
import signal
from threading import Thread
from duct import cmd
from loguru import logger
from magpie.util import children_pids
flower_pid = None
"""PID of the Flower process"""
[docs]
def start_flower():
"""Start a [Flower](https://flower.readthedocs.io/) process to monitor Celery workers
as a subprocess of this one."""
global flower_pid
handle = (cmd('uv', 'run', 'celery', '-A', 'magpie.task.queue', 'flower', '--loglevel=INFO')
.unchecked() # we do not want the monitoring thread to fail if the process is killed
.start())
flower_pid = handle.pids()[0]
logger.debug(f'Started flower process with pid={flower_pid}')
logger.debug('Access it at: http://localhost:5555/')
# wait for flower to finish in a background thread
Thread(target=lambda: handle.wait()).start()
[docs]
def stop_flower(now=False):
"""Stop a running [Flower](https://flower.readthedocs.io/) process.
Args:
now: whether to perform a cold shutdown (kill the process instantly) or a warm shutdown
(wait until all the process exits)
"""
global flower_pid
if flower_pid is None:
return
if now:
# find the pid of the actual flower process, which is the child of the `uv` one
try:
actual_flower_pid = children_pids(flower_pid)[0]
except IndexError:
# children pids list was empty -> we have no children
actual_flower_pid = flower_pid
os.kill(actual_flower_pid, signal.SIGKILL)
else:
# SIGTERM is properly forwarded to children
os.kill(flower_pid, signal.SIGTERM)
flower_pid = None