Files
2022-12-08 00:32:37 +01:00

78 lines
2.6 KiB
Python

'''
Usage:
VirtualSourceObject.produce_artifacts()
-> remember url and later supply as `current_urls`
VirtualSourceObject.build_artifact()
-> `get_ctx().record_virtual_dependency(VirtualPruner())`
'''
from lektor.reporter import reporter # report_pruned_artifact
from lektor.sourceobj import VirtualSourceObject # subclass
from lektor.utils import prune_file_and_folder
import os
from typing import TYPE_CHECKING, Set, List, Iterable
if TYPE_CHECKING:
from lektor.builder import Builder
from sqlite3 import Connection
class VirtualPruner(VirtualSourceObject):
''' Indicate that a generated VirtualSourceObject has pruning support. '''
VPATH = '/@VirtualPruner'
def __init__(self) -> None:
self._path = VirtualPruner.VPATH # if needed, add suffix variable
@property
def path(self) -> str: # type: ignore[override]
return self._path
def prune(builder: 'Builder', current_urls: Iterable[str]) -> None:
''' Removes previously generated, but now unreferenced Artifacts. '''
dest_dir = builder.destination_path
con = builder.connect_to_database()
try:
previous = _query_prunable(con)
current = _normalize_urls(current_urls)
to_be_pruned = previous.difference(current)
for file in to_be_pruned:
reporter.report_pruned_artifact(file) # type: ignore
prune_file_and_folder(os.path.join(
dest_dir, file.strip('/').replace('/', os.path.sep)), dest_dir)
# if no exception raised, update db to remove obsolete references
_prune_db_artifacts(con, list(to_be_pruned))
finally:
con.close()
# ---------------------------
# Internal helper methods
# ---------------------------
def _normalize_urls(urls: Iterable[str]) -> Set[str]:
cache = set()
for url in urls:
if url.endswith('/'):
url += 'index.html'
cache.add(url.lstrip('/'))
return cache
def _query_prunable(conn: 'Connection') -> Set[str]:
''' Query database for artifacts that have the VirtualPruner dependency '''
cur = conn.cursor()
cur.execute('SELECT artifact FROM artifacts WHERE source = ?',
[VirtualPruner.VPATH])
return set(x for x, in cur.fetchall())
def _prune_db_artifacts(conn: 'Connection', urls: List[str]) -> None:
''' Remove obsolete artifact references from database. '''
MAX_VARS = 999 # Default SQLITE_MAX_VARIABLE_NUMBER.
cur = conn.cursor()
for i in range(0, len(urls), MAX_VARS):
batch = urls[i: i + MAX_VARS]
cur.execute('DELETE FROM artifacts WHERE artifact in ({})'.format(
','.join(['?'] * len(batch))), batch)
conn.commit()