Source code for magpie
from loguru import logger
from magpie import config
from magpie.datamodel_patch import patch_all
# monkey patch utility methods on the datamodel
patch_all()
[docs]
def init():
"""Initialize the Magpie core processes/threads.
In particular, this will:
- start a monitoring thread to get back results asynchronously once tasks
have finished computing
- start a celery worker process to consume and process tasks from the queue
(if configured)
- start a celery flower task monitoring system (if configured)
"""
# import this here instead of at the top of the file so that "import magpie.config" can
# be much faster as it doesn't have to import all the LLM modules which are quite heavy
from magpie.task import flower, monitor, queue, worker
logger.debug(f'Using cache dir: {config.CACHE_DIR}')
logger.debug(f'Using log dir: {config.LOG_DIR}')
# logger.debug(f'Config:\n{config.to_str()}')
workers = queue.ping()
if not workers and not config.WORKER_START:
logger.error('App is not configured to start a worker and none could be found at the moment')
logger.error('Please make sure to run a celery worker to process tasks or set `config.WORKER_START=True`')
# start monitoring thread that will call callbacks on the main thread
# once tasks are finished
monitor.start_task_monitoring_thread()
# start worker process
if config.WORKER_START:
# check that we don't have a worker running already before starting our own
if workers:
logger.error('You have asked to start a worker cluster via `config.WORKER_START=True`')
logger.error('However we found one already running, so we are not going to start our own.')
else:
worker.start_worker()
# start flower process
if config.FLOWER_START:
flower.start_flower()
[docs]
def shutdown():
"""Shutdown all the worker processes from the task queue system.
"""
# import this here instead of at the top of the file so that "import magpie.config" can
# be much faster as it doesn't have to import all the LLM modules which are quite heavy
from magpie.task import flower, monitor, worker
if monitor.all_tasks_complete.is_set():
# initiate a fast shutdown if we know we don't have any tasks in flight
now = True
else:
# otherwise, respect our config option
now = config.WORKER_COLD_SHUTDOWN
monitor.stop_task_monitoring_thread()
worker.stop_worker(now=now)
flower.stop_flower(now=now)
# TODO: we should wait synchronously for everything to shutdown properly
# or do we not, and want to have a separate `wait_for_shutdown()`
# function?