430 lines
16 KiB
Python
430 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
import lektor.db # typing
|
|
from lektor.build_programs import BuildProgram
|
|
from lektor.builder import Artifact, Builder # typing
|
|
from lektor.pluginsystem import Plugin
|
|
from lektor.reporter import reporter
|
|
from lektor.sourceobj import SourceObject, VirtualSourceObject
|
|
from lektor.types.flow import Flow, FlowType
|
|
from lektor.utils import bool_from_string, build_url, prune_file_and_folder
|
|
|
|
from typing import \
|
|
NewType, NamedTuple, Tuple, Dict, Set, List, Optional, Iterator, Callable
|
|
|
|
VPATH = '@groupby' # potentially unsafe. All matching entries are pruned.
|
|
|
|
|
|
# -----------------------------------
|
|
# Typing
|
|
# -----------------------------------
|
|
FieldValue = NewType('FieldValue', object) # lektor model data-field value
|
|
AttributeKey = NewType('AttributeKey', str) # attribute of lektor model
|
|
GroupKey = NewType('GroupKey', str) # key of group-by
|
|
|
|
|
|
class FieldKeyPath(NamedTuple):
|
|
fieldKey: str
|
|
flowIndex: Optional[int] = None
|
|
flowKey: Optional[str] = None
|
|
|
|
|
|
class GroupByCallbackArgs(NamedTuple):
|
|
record: lektor.db.Record
|
|
key: FieldKeyPath
|
|
field: FieldValue
|
|
|
|
|
|
class GroupByCallbackYield(NamedTuple):
|
|
key: GroupKey
|
|
extra: object
|
|
|
|
|
|
GroupingCallback = Callable[[GroupByCallbackArgs],
|
|
Iterator[GroupByCallbackYield]]
|
|
|
|
|
|
class GroupProducer(NamedTuple):
|
|
attribute: AttributeKey
|
|
func: GroupingCallback
|
|
flatten: bool = True
|
|
template: Optional[str] = None
|
|
slug: Optional[str] = None
|
|
|
|
|
|
class GroupComponent(NamedTuple):
|
|
record: lektor.db.Record
|
|
extra: object
|
|
|
|
|
|
class UrlResolverConf(NamedTuple):
|
|
attribute: AttributeKey
|
|
group: GroupKey
|
|
slug: Optional[str] = None
|
|
|
|
|
|
# -----------------------------------
|
|
# Actual logic
|
|
# -----------------------------------
|
|
|
|
|
|
class GroupBySource(VirtualSourceObject):
|
|
'''
|
|
Holds information for a single group/cluster.
|
|
This object is accessible in your template file.
|
|
Attributes: record, attribute, group, children, template, slug
|
|
|
|
:DEFAULTS:
|
|
template: "groupby-attribute.html"
|
|
slug: "{attrib}/{group}/index.html"
|
|
'''
|
|
|
|
def __init__(
|
|
self,
|
|
record: lektor.db.Record,
|
|
attribute: AttributeKey,
|
|
group: GroupKey,
|
|
children: List[GroupComponent] = [],
|
|
template: Optional[str] = None, # default: "groupby-attribute.html"
|
|
slug: Optional[str] = None # default: "{attrib}/{group}/index.html"
|
|
):
|
|
super().__init__(record)
|
|
self.attribute = attribute
|
|
self.group = group
|
|
self.children = children
|
|
self.template = template or 'groupby-{}.html'.format(self.attribute)
|
|
# custom user path
|
|
slug = slug or '{attrib}/{group}/index.html'
|
|
slug = slug.replace('{attrib}', self.attribute)
|
|
slug = slug.replace('{group}', self.group)
|
|
if slug.endswith('/index.html'):
|
|
slug = slug[:-10]
|
|
self.slug = slug
|
|
|
|
@property
|
|
def path(self) -> str:
|
|
# Used in VirtualSourceInfo, used to prune VirtualObjects
|
|
return build_url([self.record.path, VPATH, self.attribute, self.group])
|
|
|
|
@property
|
|
def url_path(self) -> str:
|
|
return build_url([self.record.path, self.slug])
|
|
|
|
def iter_source_filenames(self) -> Iterator[str]:
|
|
for record, _ in self.children:
|
|
yield from record.iter_source_filenames()
|
|
|
|
def __str__(self) -> str:
|
|
txt = '<GroupBySource'
|
|
for x in ['attribute', 'group', 'template', 'slug']:
|
|
txt += ' {}="{}"'.format(x, getattr(self, x))
|
|
return txt + ' children={}>'.format(len(self.children))
|
|
|
|
|
|
class GroupByBuildProgram(BuildProgram):
|
|
''' Generates Build-Artifacts and write files. '''
|
|
|
|
def produce_artifacts(self) -> None:
|
|
url = self.source.url_path
|
|
if url.endswith('/'):
|
|
url += 'index.html'
|
|
self.declare_artifact(url, sources=list(
|
|
self.source.iter_source_filenames()))
|
|
GroupByPruner.track(url)
|
|
|
|
def build_artifact(self, artifact: Artifact) -> None:
|
|
self.source.pad.db.track_record_dependency(self.source)
|
|
artifact.render_template_into(self.source.template, this=self.source)
|
|
|
|
|
|
# -----------------------------------
|
|
# Helper
|
|
# -----------------------------------
|
|
|
|
|
|
class GroupByPruner:
|
|
'''
|
|
Static collector for build-artifact urls.
|
|
All non-tracked VPATH-urls will be pruned after build.
|
|
'''
|
|
_cache: Set[str] = set()
|
|
# Note: this var is static or otherwise two instances of
|
|
# GroupByCreator would prune each others artifacts.
|
|
|
|
@classmethod
|
|
def track(cls, url: str) -> None:
|
|
cls._cache.add(url.lstrip('/'))
|
|
|
|
@classmethod
|
|
def prune(cls, builder: Builder) -> None:
|
|
''' Remove previously generated, unreferenced Artifacts. '''
|
|
dest_path = builder.destination_path
|
|
con = builder.connect_to_database()
|
|
try:
|
|
with builder.new_build_state() as build_state:
|
|
for url, file in build_state.iter_artifacts():
|
|
if url.lstrip('/') in cls._cache:
|
|
continue # generated in this build-run
|
|
infos = build_state.get_artifact_dependency_infos(url, [])
|
|
for v_path, _ in infos:
|
|
if VPATH not in v_path:
|
|
continue # we only care about groupby Virtuals
|
|
reporter.report_pruned_artifact(url)
|
|
prune_file_and_folder(file.filename, dest_path)
|
|
build_state.remove_artifact(url)
|
|
break # there is only one VPATH-entry per source
|
|
finally:
|
|
con.close()
|
|
cls._cache.clear()
|
|
|
|
|
|
# -----------------------------------
|
|
# Main Component
|
|
# -----------------------------------
|
|
|
|
|
|
class GroupByCreator:
|
|
'''
|
|
Process all children with matching conditions under specified page.
|
|
Creates a grouping of pages with similar (self-defined) attributes.
|
|
The grouping is performed only once per build (or manually invoked).
|
|
'''
|
|
|
|
def __init__(self):
|
|
self._flows: Dict[AttributeKey, Dict[str, Set[str]]] = {}
|
|
self._models: Dict[AttributeKey, Dict[str, Dict[str, str]]] = {}
|
|
self._func: Dict[str, Set[GroupProducer]] = {}
|
|
self._resolve_map: Dict[str, UrlResolverConf] = {} # only for server
|
|
|
|
# --------------
|
|
# Initialize
|
|
# --------------
|
|
|
|
def initialize(self, db: lektor.db):
|
|
self._flows.clear()
|
|
self._models.clear()
|
|
self._resolve_map.clear()
|
|
for prod_list in self._func.values():
|
|
for producer in prod_list:
|
|
self._register(db, producer.attribute)
|
|
|
|
def _register(self, db: lektor.db, attrib: AttributeKey) -> None:
|
|
''' Preparation: find models and flow-models which contain attrib '''
|
|
if attrib in self._flows or attrib in self._models:
|
|
return # already added
|
|
# find flow blocks with attrib
|
|
_flows = {} # Dict[str, Set[str]]
|
|
for key, flow in db.flowblocks.items():
|
|
tmp1 = set(f.name for f in flow.fields
|
|
if bool_from_string(f.options.get(attrib, False)))
|
|
if tmp1:
|
|
_flows[key] = tmp1
|
|
# find models with attrib or flow-blocks containing attrib
|
|
_models = {} # Dict[str, Dict[str, str]]
|
|
for key, model in db.datamodels.items():
|
|
tmp2 = {} # Dict[str, str]
|
|
for field in model.fields:
|
|
if bool_from_string(field.options.get(attrib, False)):
|
|
tmp2[field.name] = '*' # include all children
|
|
elif isinstance(field.type, FlowType):
|
|
if any(x in _flows for x in field.type.flow_blocks):
|
|
tmp2[field.name] = '?' # only some flow blocks
|
|
if tmp2:
|
|
_models[key] = tmp2
|
|
|
|
self._flows[attrib] = _flows
|
|
self._models[attrib] = _models
|
|
|
|
# ----------------
|
|
# Add Observer
|
|
# ----------------
|
|
|
|
def watch(
|
|
self,
|
|
root: str,
|
|
attrib: AttributeKey, *,
|
|
flatten: bool = True, # if False, dont explode FlowType
|
|
template: Optional[str] = None, # default: "groupby-attrib.html"
|
|
slug: Optional[str] = None # default: "{attrib}/{group}/index.html"
|
|
) -> Callable[[GroupingCallback], None]:
|
|
'''
|
|
Decorator to subscribe to attrib-elements. Converter for groupby().
|
|
Refer to groupby() for further details.
|
|
|
|
(record, field-key, field) -> (group-key, extra-info)
|
|
|
|
:DEFAULTS:
|
|
template: "groupby-attrib.html"
|
|
slug: "{attrib}/{group}/index.html"
|
|
'''
|
|
def _decorator(fn: GroupingCallback):
|
|
if root not in self._func:
|
|
self._func[root] = set()
|
|
self._func[root].add(
|
|
GroupProducer(attrib, fn, flatten, template, slug))
|
|
|
|
return _decorator
|
|
|
|
# ----------
|
|
# Helper
|
|
# ----------
|
|
|
|
def iter_record_fields(
|
|
self,
|
|
source: lektor.db.Record,
|
|
attrib: AttributeKey,
|
|
flatten: bool = False
|
|
) -> Iterator[Tuple[FieldKeyPath, FieldValue]]:
|
|
''' Enumerate all fields of a lektor.db.Record with attrib = True '''
|
|
assert isinstance(source, lektor.db.Record)
|
|
_flows = self._flows.get(attrib, {})
|
|
_models = self._models.get(attrib, {})
|
|
|
|
for r_key, subs in _models.get(source.datamodel.id, {}).items():
|
|
if subs == '*': # either normal field or flow type (all blocks)
|
|
field = source[r_key]
|
|
if flatten and isinstance(field, Flow):
|
|
for i, flow in enumerate(field.blocks):
|
|
flowtype = flow['_flowblock']
|
|
for f_key, block in flow._data.items():
|
|
if f_key.startswith('_'): # e.g., _flowblock
|
|
continue
|
|
yield FieldKeyPath(r_key, i, f_key), block
|
|
else:
|
|
yield FieldKeyPath(r_key), field
|
|
else: # always flow type (only some blocks)
|
|
for i, flow in enumerate(source[r_key].blocks):
|
|
flowtype = flow['_flowblock']
|
|
for f_key in _flows.get(flowtype, []):
|
|
yield FieldKeyPath(r_key, i, f_key), flow[f_key]
|
|
|
|
def groupby(
|
|
self,
|
|
attrib: AttributeKey,
|
|
root: lektor.db.Record,
|
|
func: GroupingCallback,
|
|
flatten: bool = False,
|
|
incl_attachments: bool = True
|
|
) -> Dict[GroupKey, List[GroupComponent]]:
|
|
'''
|
|
Traverse selected root record with all children and group by func.
|
|
Func is called with (record, FieldKeyPath, FieldValue).
|
|
Func may yield one or more (group-key, extra-info) tuples.
|
|
|
|
return {'group-key': [(record, extra-info), ...]}
|
|
'''
|
|
assert callable(func), 'no GroupingCallback provided'
|
|
assert isinstance(root, lektor.db.Record)
|
|
tmap = {} # type: Dict[GroupKey, List[GroupComponent]]
|
|
recursive_list = [root] # type: List[lektor.db.Record]
|
|
while recursive_list:
|
|
record = recursive_list.pop()
|
|
if hasattr(record, 'children'):
|
|
# recursive_list += record.children
|
|
recursive_list.extend(record.children)
|
|
if incl_attachments and hasattr(record, 'attachments'):
|
|
# recursive_list += record.attachments
|
|
recursive_list.extend(record.attachments)
|
|
for key, field in self.iter_record_fields(record, attrib, flatten):
|
|
for ret in func(GroupByCallbackArgs(record, key, field)) or []:
|
|
assert isinstance(ret, (tuple, list)), \
|
|
'Must return tuple (group-key, extra-info)'
|
|
group_key, extras = ret
|
|
if group_key not in tmap:
|
|
tmap[group_key] = []
|
|
tmap[group_key].append(GroupComponent(record, extras))
|
|
return tmap
|
|
|
|
# -----------------
|
|
# Create groups
|
|
# -----------------
|
|
|
|
def should_process(self, node: SourceObject) -> bool:
|
|
''' Check if record path is being watched. '''
|
|
return isinstance(node, lektor.db.Record) \
|
|
and node.url_path in self._func
|
|
|
|
def make_cluster(self, root: lektor.db.Record) -> Iterator[GroupBySource]:
|
|
''' Group by attrib and build Artifacts. '''
|
|
assert isinstance(root, lektor.db.Record)
|
|
for attrib, fn, flat, temp, slug in self._func.get(root.url_path, []):
|
|
groups = self.groupby(attrib, root, func=fn, flatten=flat)
|
|
for group_key, children in groups.items():
|
|
obj = GroupBySource(root, attrib, group_key, children,
|
|
template=temp, slug=slug)
|
|
self.track_dev_server_path(obj)
|
|
yield obj
|
|
|
|
# ------------------
|
|
# Path resolving
|
|
# ------------------
|
|
|
|
def resolve_virtual_path(
|
|
self, node: SourceObject, pieces: List[str]
|
|
) -> Optional[GroupBySource]:
|
|
''' Given a @VPATH/attrib/groupkey path, determine url path. '''
|
|
if len(pieces) >= 2:
|
|
attrib: AttributeKey = pieces[0] # type: ignore[assignment]
|
|
group: GroupKey = pieces[1] # type: ignore[assignment]
|
|
for attr, _, _, _, slug in self._func.get(node.url_path, []):
|
|
if attr == attrib:
|
|
# TODO: do we need to provide the template too?
|
|
return GroupBySource(node, attr, group, slug=slug)
|
|
return None
|
|
|
|
def track_dev_server_path(self, sender: GroupBySource) -> None:
|
|
''' Dev server only: Add target path to reverse artifact url lookup '''
|
|
self._resolve_map[sender.url_path] = \
|
|
UrlResolverConf(sender.attribute, sender.group, sender.slug)
|
|
|
|
def resolve_dev_server_path(
|
|
self, node: SourceObject, pieces: List[str]
|
|
) -> Optional[GroupBySource]:
|
|
''' Dev server only: Resolve actual url to virtual obj. '''
|
|
prev = self._resolve_map.get(build_url([node.url_path] + pieces))
|
|
if prev:
|
|
attrib, group, slug = prev
|
|
return GroupBySource(node, attrib, group, slug=slug)
|
|
return None
|
|
|
|
|
|
# -----------------------------------
|
|
# Plugin Entry
|
|
# -----------------------------------
|
|
|
|
|
|
class GroupByPlugin(Plugin):
|
|
name = 'GroupBy Plugin'
|
|
description = 'Cluster arbitrary records with field attribute keyword.'
|
|
|
|
def on_setup_env(self, **extra):
|
|
self.creator = GroupByCreator()
|
|
self.env.add_build_program(GroupBySource, GroupByBuildProgram)
|
|
# let other plugins register their @groupby.watch functions
|
|
self.emit('init', groupby=self.creator)
|
|
|
|
# resolve /tag/rss/ -> /tag/rss/index.html (local server only)
|
|
@self.env.urlresolver
|
|
def groupby_path_resolver(node, pieces):
|
|
if self.creator.should_process(node):
|
|
return self.creator.resolve_dev_server_path(node, pieces)
|
|
|
|
# use VPATH in templates: {{ '/@groupby/attrib/group' | url }}
|
|
@self.env.virtualpathresolver(VPATH.lstrip('@'))
|
|
def groupby_virtualpath_resolver(node, pieces):
|
|
if self.creator.should_process(node):
|
|
return self.creator.resolve_virtual_path(node, pieces)
|
|
|
|
# injection to generate GroupBy nodes when processing artifacts
|
|
@self.env.generator
|
|
def groupby_generator(node):
|
|
if self.creator.should_process(node):
|
|
yield from self.creator.make_cluster(node)
|
|
|
|
def on_before_build_all(self, builder, **extra):
|
|
# parse all models to detect attribs of listeners
|
|
self.creator.initialize(builder.pad.db)
|
|
|
|
def on_after_prune(self, builder, **extra):
|
|
# TODO: find better way to prune unreferenced elements
|
|
GroupByPruner.prune(builder)
|