Source code for magpie.logging

import inspect
import logging
import sys
from pathlib import Path

from celery.signals import setup_logging
from loguru import logger

from magpie import config


# see: https://loguru.readthedocs.io/en/stable/overview.html#entirely-compatible-with-standard-logging
[docs] class InterceptHandler(logging.Handler):
[docs] def emit(self, record: logging.LogRecord) -> None: # Get corresponding Loguru level if it exists. level: str | int try: level = logger.level(record.levelname).name except ValueError: level = record.levelno # Find caller from where originated the logged message. frame, depth = inspect.currentframe(), 0 while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__): frame = frame.f_back depth += 1 logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
[docs] @setup_logging.connect def setup_logging(*args, **kwargs): logging.basicConfig(handlers=[InterceptHandler()], level=0) logging.getLogger('trafilatura').setLevel(logging.INFO)
FORMAT_BEGIN = ( "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | " "<level>{level: <8}</level>" ) FORMAT_END = ( "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>\n" )
[docs] def custom_formatter(record): if task_id: # note: leaving this commented here to have an example of getting a worker id if needed # from billiard.process import current_process # p = current_process() # worker = f'[worker{p.index} {p.name} {p.pid}]' return f'{FORMAT_BEGIN} | {task_id} | {FORMAT_END}' else: return f'{FORMAT_BEGIN} | {FORMAT_END}'
[docs] def configure(): logger.remove(0) # remove default handler levels = {} for modlevel in config.MAGPIE_LOG_LEVEL.split(','): ml = modlevel.split('=') if len(ml) == 1: # default log level levels[''] = ml[0] elif len(ml) == 2: # module specific log level levels[ml[0]] = ml[1] else: raise ValueError(f'Cannot parse log levels: {config.MAGPIE_LOG_LEVEL}') logger.add(sys.stderr, format=custom_formatter, filter=levels, level=0)
[docs] def logfile_for_task(task_id): return Path(config.LOG_DIR) / f'task_{task_id}.log'
[docs] def recent_task_logs(count: int = 5): """Return the task IDs for the most recent task logs.""" logfiles = Path(config.LOG_DIR).glob('task_*.log') logfiles = sorted((f.stat().st_mtime, f) for f in logfiles) return [f.stem[5:] for (_mtime, f) in logfiles[:count]]
# task id singleton (1 per process) # this is safe as long as we stay with the prefork model in Celery # see: https://docs.celeryq.dev/en/latest/userguide/concurrency/index.html task_id = None # id for the logger handler that we can use to remove it when we're done with it task_log_file_handle = None # configure logging upon importing of this module configure()