import os
import re
import shutil
from html import unescape
from os.path import exists, join
from pathlib import Path
from urllib.parse import unquote, urlparse
import duct
from celery import chain, group, shared_task
from loguru import logger
from magpie.config import CACHE_DIR
from magpie.datamodel import Folder, Twig
WGET_OPTS = [
'--span-hosts', # -H, do not limit download of assets to page host
'--adjust-extension', # -E, adjust extension for CGI-generated material
'--trust-server-names', # for dynamic files, use name given by server
# '--content-disposition', # same as above but `--trust-server-names` seems to be the better option
'--convert-links', # -k, convert links on the page to point to the ones downloaded
'--page-requisites', # -p, download all necessary resources to properly display page
# '--timestamping', # -N, do not re-downloaded files that haven't been modified
'--execute', 'robots=off', # ignore robots.txt
# '--backup-converted', # -K, backup original file when converting a file
# '--restrict-file-names=nocontrol',
]
[docs]
@shared_task
def cache_url_duct(url: str):
logger.info(f'Cache URL: {url}')
cmd = [
'wget',
*WGET_OPTS,
url
]
logger.trace(f'Running command: {' '.join(cmd)}')
r = duct.cmd(*cmd).dir(CACHE_DIR).stderr_to_stdout().unchecked().read()
logger.trace(r)
[docs]
def cache_urls_in_folder(folder: Folder, cache_url):
for item in folder:
if isinstance(item, Folder):
cache_urls_in_folder(item, cache_url)
elif isinstance(item, Twig):
cache_url(item.url)
logger.warning(f'cache_url {item.url} returns')
else:
raise TypeError(f'Invalid type for caching item: {item} (type={type(item)})')
[docs]
def url_to_cached_url(url: str) -> str:
r = urlparse(url)
logger.debug(r)
cached_file = r._replace(scheme='', netloc=f'{CACHE_DIR}/{r.netloc}').geturl()[2:]
logger.debug(cached_file)
if exists(Path(cached_file) / 'index.html'):
return 'file://' + str(Path(cached_file) / 'index.html')
elif exists(cached_file + '.html'):
return 'file://' + cached_file + '.html'
else:
return 'file://' + cached_file
[docs]
def clear_cache():
shutil.rmtree(CACHE_DIR)
os.makedirs(CACHE_DIR, exist_ok=True)
[docs]
def refresh_cache_celery(collection: Folder):
logger.debug(collection.as_text())
urls = []
def cache_url(url):
urls.append(url.value)
# this just builds the full list of URLs to cache by traversing our collection
# and adding those URLs to the `urls` variable
cache_urls_in_folder(collection, cache_url)
# TODO: notify progress in status bar
# use the `run_task()` function in `magpie.task.queue` to do so
tasks = group(cache_url_duct.s(url) for url in urls)
tasks = chain(tasks, fix_extensionless_links.s())
tasks()
[docs]
@shared_task
def fix_extensionless_links(unused=None): # we need the `unused` param to be able to chain it after the caching tasks
"""This is similar to what `--adjust-extension` does for wget, except that
wget only does it for html, css and a few other files.
"""
logger.info("Fix dynamic links that `--content-disposition` didn't")
rexp = re.compile(r'src="([^"#]*)') # find links in src html attribute and drop the fragment part
for root, dirs, files in os.walk(CACHE_DIR):
for f in files:
if not (f.endswith('.html') or f.endswith('.html')):
continue
abspath = join(root, f)
with open(abspath) as file:
html = file.read()
replace = {}
rename = {}
matches = rexp.findall(html)
if matches:
logger.debug(f'Found potential links to replace in file: {abspath}')
# find list of links to replace (and corresponding files to rename)
for target_match in matches:
target = unescape(unquote(target_match))
# we found a target link, check to see if we need to fix its extension
if '?' not in target:
# we're all good
continue
# we have a query part, split it there, get the extension from the file being served
# and add it to the total filename on the filesystem
ext = Path(target.split('?')[0]).suffix
if not ext:
# no extension, nothing to do
continue
if target.endswith(ext):
# extension already appended to filename, nothing to do
continue
logger.debug(f'- {target}')
target_file = join(root, target)
if exists(target_file):
replace[target_match] = target_match + ext
rename[target_file] = target_file + ext
if replace:
# rename files
for orig, fixed in rename.items():
logger.debug(f'rename\n{orig} to\n{fixed}')
os.rename(orig, fixed)
# update the refs in the html file
for orig, fixed in replace.items():
logger.debug(f'replaced\n{orig} with\n{fixed}')
html = html.replace(orig, fixed)
# save updated html file
with open(abspath, 'w') as f:
f.write(html)
[docs]
def test_cache_url():
import time
from magpie.tasks import cache_url
logger.info('░▒▓ TEST CACHE URL USING CELERY TASK QUEUE ▓▒░')
url = 'https://docs.astral.sh/uv/'
result = cache_url.delay(url)
while not result.ready():
logger.info('...')
time.sleep(1)
logger.info('Fix dynamic links that `--content-disposition` didn\'t')
fix_extensionless_links()
# test: check that cached page displays the bar graph at the top