78 lines
2.6 KiB
Python
78 lines
2.6 KiB
Python
'''
|
|
Usage:
|
|
VirtualSourceObject.produce_artifacts()
|
|
-> remember url and later supply as `current_urls`
|
|
VirtualSourceObject.build_artifact()
|
|
-> `get_ctx().record_virtual_dependency(VirtualPruner())`
|
|
'''
|
|
from lektor.reporter import reporter # report_pruned_artifact
|
|
from lektor.sourceobj import VirtualSourceObject # subclass
|
|
from lektor.utils import prune_file_and_folder
|
|
import os
|
|
from typing import TYPE_CHECKING, Set, List, Iterable
|
|
if TYPE_CHECKING:
|
|
from lektor.builder import Builder
|
|
from sqlite3 import Connection
|
|
|
|
|
|
class VirtualPruner(VirtualSourceObject):
|
|
''' Indicate that a generated VirtualSourceObject has pruning support. '''
|
|
VPATH = '/@VirtualPruner'
|
|
|
|
def __init__(self) -> None:
|
|
self._path = VirtualPruner.VPATH # if needed, add suffix variable
|
|
|
|
@property
|
|
def path(self) -> str: # type: ignore[override]
|
|
return self._path
|
|
|
|
|
|
def prune(builder: 'Builder', current_urls: Iterable[str]) -> None:
|
|
''' Removes previously generated, but now unreferenced Artifacts. '''
|
|
dest_dir = builder.destination_path
|
|
con = builder.connect_to_database()
|
|
try:
|
|
previous = _query_prunable(con)
|
|
current = _normalize_urls(current_urls)
|
|
to_be_pruned = previous.difference(current)
|
|
for file in to_be_pruned:
|
|
reporter.report_pruned_artifact(file) # type: ignore
|
|
prune_file_and_folder(os.path.join(
|
|
dest_dir, file.strip('/').replace('/', os.path.sep)), dest_dir)
|
|
# if no exception raised, update db to remove obsolete references
|
|
_prune_db_artifacts(con, list(to_be_pruned))
|
|
finally:
|
|
con.close()
|
|
|
|
|
|
# ---------------------------
|
|
# Internal helper methods
|
|
# ---------------------------
|
|
|
|
def _normalize_urls(urls: Iterable[str]) -> Set[str]:
|
|
cache = set()
|
|
for url in urls:
|
|
if url.endswith('/'):
|
|
url += 'index.html'
|
|
cache.add(url.lstrip('/'))
|
|
return cache
|
|
|
|
|
|
def _query_prunable(conn: 'Connection') -> Set[str]:
|
|
''' Query database for artifacts that have the VirtualPruner dependency '''
|
|
cur = conn.cursor()
|
|
cur.execute('SELECT artifact FROM artifacts WHERE source = ?',
|
|
[VirtualPruner.VPATH])
|
|
return set(x for x, in cur.fetchall())
|
|
|
|
|
|
def _prune_db_artifacts(conn: 'Connection', urls: List[str]) -> None:
|
|
''' Remove obsolete artifact references from database. '''
|
|
MAX_VARS = 999 # Default SQLITE_MAX_VARIABLE_NUMBER.
|
|
cur = conn.cursor()
|
|
for i in range(0, len(urls), MAX_VARS):
|
|
batch = urls[i: i + MAX_VARS]
|
|
cur.execute('DELETE FROM artifacts WHERE artifact in ({})'.format(
|
|
','.join(['?'] * len(batch))), batch)
|
|
conn.commit()
|