Source code for magpie

from loguru import logger

from magpie import config
from magpie.datamodel_patch import patch_all

# monkey patch utility methods on the datamodel
patch_all()


[docs] def init(): """Initialize the Magpie core processes/threads. In particular, this will: - start a monitoring thread to get back results asynchronously once tasks have finished computing - start a celery worker process to consume and process tasks from the queue (if configured) - start a celery flower task monitoring system (if configured) """ # import this here instead of at the top of the file so that "import magpie.config" can # be much faster as it doesn't have to import all the LLM modules which are quite heavy from magpie.task import flower, monitor, queue, worker logger.debug(f'Using cache dir: {config.CACHE_DIR}') logger.debug(f'Using log dir: {config.LOG_DIR}') # logger.debug(f'Config:\n{config.to_str()}') workers = queue.ping() if not workers and not config.WORKER_START: logger.error('App is not configured to start a worker and none could be found at the moment') logger.error('Please make sure to run a celery worker to process tasks or set `config.WORKER_START=True`') # start monitoring thread that will call callbacks on the main thread # once tasks are finished monitor.start_task_monitoring_thread() # start worker process if config.WORKER_START: # check that we don't have a worker running already before starting our own if workers: logger.error('You have asked to start a worker cluster via `config.WORKER_START=True`') logger.error('However we found one already running, so we are not going to start our own.') else: worker.start_worker() # start flower process if config.FLOWER_START: flower.start_flower()
[docs] def shutdown(): """Shutdown all the worker processes from the task queue system. """ # import this here instead of at the top of the file so that "import magpie.config" can # be much faster as it doesn't have to import all the LLM modules which are quite heavy from magpie.task import flower, monitor, worker if monitor.all_tasks_complete.is_set(): # initiate a fast shutdown if we know we don't have any tasks in flight now = True else: # otherwise, respect our config option now = config.WORKER_COLD_SHUTDOWN monitor.stop_task_monitoring_thread() worker.stop_worker(now=now) flower.stop_flower(now=now)
# TODO: we should wait synchronously for everything to shutdown properly # or do we not, and want to have a separate `wait_for_shutdown()` # function?