complete refactoring to ensure group before build

Due to a concurrency bug (lektor/lektor#1017), a source file is
sporadically not updated because `before-build` is evaluated faster
than `before-build-all`. Fixed with a redundant build process.

Also:
- adds before- and after-init hooks
- encapsulates logic into separate classes
- fix virtual path and remove virtual path resolver
- more type hints (incl. bugfixes)
This commit is contained in:
relikd
2022-03-31 04:16:18 +02:00
parent dfdf55a5a5
commit c9c1ab69b1
3 changed files with 299 additions and 424 deletions

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
import lektor.db # typing
from lektor.db import Database, Record # typing
from lektor.build_programs import BuildProgram
from lektor.builder import Artifact, Builder # typing
from lektor.pluginsystem import Plugin
@@ -10,8 +10,8 @@ from lektor.utils import bool_from_string, build_url, prune_file_and_folder
# for quick config
from lektor.utils import slugify
from typing import \
NewType, NamedTuple, Tuple, Dict, Set, List, Optional, Iterator, Callable
from typing import Tuple, Dict, Set, List, NamedTuple
from typing import NewType, Optional, Iterator, Callable, Iterable
VPATH = '@groupby' # potentially unsafe. All matching entries are pruned.
@@ -19,11 +19,16 @@ VPATH = '@groupby' # potentially unsafe. All matching entries are pruned.
# -----------------------------------
# Typing
# -----------------------------------
FieldValue = NewType('FieldValue', object) # lektor model data-field value
AttributeKey = NewType('AttributeKey', str) # attribute of lektor model
GroupKey = NewType('GroupKey', str) # key of group-by
class ResolverConf(NamedTuple):
attrib: AttributeKey
group: GroupKey
slug: str
class FieldKeyPath(NamedTuple):
fieldKey: str
flowIndex: Optional[int] = None
@@ -31,36 +36,17 @@ class FieldKeyPath(NamedTuple):
class GroupByCallbackArgs(NamedTuple):
record: lektor.db.Record
record: Record
key: FieldKeyPath
field: FieldValue
class GroupByCallbackYield(NamedTuple):
key: GroupKey
extra: object
field: object # lektor model data-field value
GroupingCallback = Callable[[GroupByCallbackArgs],
Iterator[GroupByCallbackYield]]
class GroupProducer(NamedTuple):
attribute: AttributeKey
func: GroupingCallback
flatten: bool = True
slug: Optional[str] = None
template: Optional[str] = None
dependency: Optional[str] = None
class GroupComponent(NamedTuple):
record: lektor.db.Record
extra: object
Iterator[Tuple[GroupKey, object]]]
# -----------------------------------
# Actual logic
# VirtualSource & BuildProgram
# -----------------------------------
@@ -68,61 +54,62 @@ class GroupBySource(VirtualSourceObject):
'''
Holds information for a single group/cluster.
This object is accessible in your template file.
Attributes: record, attribute, group, children, template, slug
Attributes: record, attrib, group, slug, template, children
:DEFAULTS:
template: "groupby-attribute.html"
slug: "{attrib}/{group}/index.html"
template: "groupby-attribute.html"
'''
def __init__(
self,
record: lektor.db.Record,
attribute: AttributeKey,
group: GroupKey,
children: List[GroupComponent] = [],
record: Record,
attrib: AttributeKey,
group: GroupKey, *,
slug: Optional[str] = None, # default: "{attrib}/{group}/index.html"
template: Optional[str] = None, # default: "groupby-attribute.html"
dependency: Optional[str] = None
):
template: Optional[str] = None # default: "groupby-attrib.html"
) -> None:
super().__init__(record)
self.attribute = attribute
self.attrib = attrib
self.group = group
self.children = children
self.template = template or 'groupby-{}.html'.format(self.attribute)
self.dependency = dependency
self.template = template or 'groupby-{}.html'.format(self.attrib)
# custom user path
slug = slug or '{attrib}/{group}/index.html'
slug = slug.replace('{attrib}', self.attribute)
slug = slug.replace('{attrib}', self.attrib)
slug = slug.replace('{group}', self.group)
if slug.endswith('/index.html'):
slug = slug[:-10]
self.slug = slug
# user adjustable after init
self.children = {} # type: Dict[Record, List[object]]
self.dependencies = set() # type: Set[str]
@property
def path(self) -> str:
# Used in VirtualSourceInfo, used to prune VirtualObjects
return build_url([self.record.path, VPATH, self.attribute, self.group])
return f'{self.record.path}{VPATH}/{self.attrib}/{self.group}'
@property
def url_path(self) -> str:
# Actual path to resource as seen by the browser
return build_url([self.record.path, self.slug])
def iter_source_filenames(self) -> Iterator[str]:
if self.dependency:
yield self.dependency
for record, _ in self.children:
''' Enumerate all dependencies '''
if self.dependencies:
yield from self.dependencies
for record in self.children:
yield from record.iter_source_filenames()
def __str__(self) -> str:
txt = '<GroupBySource'
for x in ['attribute', 'group', 'template', 'slug']:
for x in ['attrib', 'group', 'slug', 'template']:
txt += ' {}="{}"'.format(x, getattr(self, x))
return txt + ' children={}>'.format(len(self.children))
class GroupByBuildProgram(BuildProgram):
''' Generates Build-Artifacts and write files. '''
''' Generate Build-Artifacts and write files. '''
def produce_artifacts(self) -> None:
url = self.source.url_path
@@ -153,6 +140,7 @@ class GroupByPruner:
@classmethod
def track(cls, url: str) -> None:
''' Add url to build cache to prevent pruning. '''
cls._cache.add(url.lstrip('/'))
@classmethod
@@ -178,129 +166,47 @@ class GroupByPruner:
cls._cache.clear()
# -----------------------------------
# Main Component
# -----------------------------------
class GroupByModelReader:
''' Find models and flow-models which contain attrib '''
class GroupByCreator:
'''
Process all children with matching conditions under specified page.
Creates a grouping of pages with similar (self-defined) attributes.
The grouping is performed only once per build (or manually invoked).
'''
def __init__(self):
self._flows: Dict[AttributeKey, Dict[str, Set[str]]] = {}
self._models: Dict[AttributeKey, Dict[str, Dict[str, str]]] = {}
self._func: Dict[str, Set[GroupProducer]] = {}
self._resolve_map: Dict[str, GroupBySource] = {} # only for server
self._watched_once: Set[GroupingCallback] = set()
# --------------
# Initialize
# --------------
def initialize(self, db: lektor.db):
self._flows.clear()
self._models.clear()
self._resolve_map.clear()
for prod_list in self._func.values():
for producer in prod_list:
self._register(db, producer.attribute)
def _register(self, db: lektor.db, attrib: AttributeKey) -> None:
''' Preparation: find models and flow-models which contain attrib '''
if attrib in self._flows or attrib in self._models:
return # already added
def __init__(self, db: Database, attrib: AttributeKey) -> None:
self._flows = {} # type: Dict[str, Set[str]]
self._models = {} # type: Dict[str, Dict[str, str]]
# find flow blocks with attrib
_flows = {} # Dict[str, Set[str]]
for key, flow in db.flowblocks.items():
tmp1 = set(f.name for f in flow.fields
if bool_from_string(f.options.get(attrib, False)))
if tmp1:
_flows[key] = tmp1
self._flows[key] = tmp1
# find models with attrib or flow-blocks containing attrib
_models = {} # Dict[str, Dict[str, str]]
for key, model in db.datamodels.items():
tmp2 = {} # Dict[str, str]
for field in model.fields:
if bool_from_string(field.options.get(attrib, False)):
tmp2[field.name] = '*' # include all children
elif isinstance(field.type, FlowType):
if any(x in _flows for x in field.type.flow_blocks):
elif isinstance(field.type, FlowType) and self._flows:
# only processed if at least one flow has attrib
fbs = field.type.flow_blocks
# if fbs == None, all flow-blocks are allowed
if fbs is None or any(x in self._flows for x in fbs):
tmp2[field.name] = '?' # only some flow blocks
if tmp2:
_models[key] = tmp2
self._models[key] = tmp2
self._flows[attrib] = _flows
self._models[attrib] = _models
# ----------------
# Add Observer
# ----------------
def watch(
def read(
self,
root: str,
attrib: AttributeKey, *,
flatten: bool = True, # if False, dont explode FlowType
slug: Optional[str] = None, # default: "{attrib}/{group}/index.html"
template: Optional[str] = None, # default: "groupby-attrib.html"
dependency: Optional[str] = None
) -> Callable[[GroupingCallback], None]:
'''
Decorator to subscribe to attrib-elements. Converter for groupby().
Refer to groupby() for further details.
(record, field-key, field) -> (group-key, extra-info)
:DEFAULTS:
template: "groupby-attrib.html"
slug: "{attrib}/{group}/index.html"
'''
root = root.rstrip('/') + '/'
def _decorator(fn: GroupingCallback):
if root not in self._func:
self._func[root] = set()
self._func[root].add(
GroupProducer(attrib, fn, flatten, template, slug, dependency))
return _decorator
def watch_once(self, *args, **kwarg) -> Callable[[GroupingCallback], None]:
''' Same as watch() but listener is auto removed after build. '''
def _decorator(fn: GroupingCallback):
self._watched_once.add(fn)
self.watch(*args, **kwarg)(fn)
return _decorator
def remove_watch_once(self) -> None:
''' Remove all watch-once listeners. '''
for k, v in self._func.items():
not_once = {x for x in v if x.func not in self._watched_once}
self._func[k] = not_once
self._watched_once.clear()
# ----------
# Helper
# ----------
def iter_record_fields(
self,
source: lektor.db.Record,
attrib: AttributeKey,
record: Record,
flatten: bool = False
) -> Iterator[Tuple[FieldKeyPath, FieldValue]]:
''' Enumerate all fields of a lektor.db.Record with attrib = True '''
assert isinstance(source, lektor.db.Record)
_flows = self._flows.get(attrib, {})
_models = self._models.get(attrib, {})
for r_key, subs in _models.get(source.datamodel.id, {}).items():
) -> Iterator[Tuple[FieldKeyPath, object]]:
'''
Enumerate all fields of a Record with attrib = True.
Flows are either returned directly (flatten=False) or
expanded so that each flow-block is yielded (flatten=True)
'''
assert isinstance(record, Record)
for r_key, subs in self._models.get(record.datamodel.id, {}).items():
if subs == '*': # either normal field or flow type (all blocks)
field = source[r_key]
field = record[r_key]
if flatten and isinstance(field, Flow):
for i, flow in enumerate(field.blocks):
flowtype = flow['_flowblock']
@@ -311,94 +217,224 @@ class GroupByCreator:
else:
yield FieldKeyPath(r_key), field
else: # always flow type (only some blocks)
for i, flow in enumerate(source[r_key].blocks):
for i, flow in enumerate(record[r_key].blocks):
flowtype = flow['_flowblock']
for f_key in _flows.get(flowtype, []):
for f_key in self._flows.get(flowtype, []):
yield FieldKeyPath(r_key, i, f_key), flow[f_key]
def groupby(
class GroupByState:
''' Holds and updates a groupby build state. '''
def __init__(self) -> None:
self.state = {} # type: Dict[GroupKey, Dict[Record, List]]
self._processed = set() # type: Set[Record]
def __contains__(self, record: Record) -> bool:
''' Returns True if record was already processed. '''
return record in self._processed
def items(self) -> Iterable[Tuple[GroupKey, Dict]]:
''' Iterable with (GroupKey, {record: extras}) tuples. '''
return self.state.items()
def add(self, record: Record, group: Dict[GroupKey, List]) -> None:
''' Append groups if not processed already. '''
if record not in self._processed:
self._processed.add(record)
for group_key, extras in group.items():
if group_key in self.state:
self.state[group_key][record] = extras
else:
self.state[group_key] = {record: extras}
class GroupByWatcher:
'''
Callback is called with (Record, FieldKeyPath, field-value).
Callback may yield one or more (group-key, extra-info) tuples.
'''
def __init__(
self,
root: str,
attrib: AttributeKey,
root: lektor.db.Record,
func: GroupingCallback,
flatten: bool = False,
incl_attachments: bool = True
) -> Dict[GroupKey, List[GroupComponent]]:
'''
Traverse selected root record with all children and group by func.
Func is called with (record, FieldKeyPath, FieldValue).
Func may yield one or more (group-key, extra-info) tuples.
callback: GroupingCallback, *,
slug: Optional[str] = None, # default: "{attrib}/{group}/index.html"
template: Optional[str] = None # default: "groupby-attrib.html"
) -> None:
self.root = root
self.attrib = attrib
self.callback = callback
self.slug = slug
self.template = template
# user editable attributes
self.flatten = True # if False, dont explode FlowType
self.dependencies = set() # type: Set[str]
return {'group-key': [(record, extra-info), ...]}
'''
assert callable(func), 'no GroupingCallback provided'
assert isinstance(root, lektor.db.Record)
tmap = {} # type: Dict[GroupKey, List[GroupComponent]]
recursive_list = [root] # type: List[lektor.db.Record]
while recursive_list:
record = recursive_list.pop()
if hasattr(record, 'children'):
# recursive_list += record.children
recursive_list.extend(record.children)
if incl_attachments and hasattr(record, 'attachments'):
# recursive_list += record.attachments
recursive_list.extend(record.attachments)
for key, field in self.iter_record_fields(record, attrib, flatten):
for ret in func(GroupByCallbackArgs(record, key, field)) or []:
assert isinstance(ret, (tuple, list)), \
'Must return tuple (group-key, extra-info)'
group_key, extras = ret
if group_key not in tmap:
tmap[group_key] = []
tmap[group_key].append(GroupComponent(record, extras))
return tmap
# -----------------
# Create groups
# -----------------
def initialize(self, db: Database) -> None:
''' Reset internal state. You must initialize before each build! '''
self._state = GroupByState()
self._model_reader = GroupByModelReader(db, self.attrib)
def should_process(self, node: SourceObject) -> bool:
''' Check if record path is being watched. '''
return isinstance(node, lektor.db.Record) \
and node.url_path in self._func
if isinstance(node, Record):
p = node['_path'] # type: str
return p.startswith(self.root) or p + '/' == self.root
return False
def make_cluster(self, root: lektor.db.Record) -> Iterator[GroupBySource]:
''' Group by attrib and build Artifacts. '''
assert isinstance(root, lektor.db.Record)
for attr, fn, fl, temp, slug, dep in self._func.get(root.url_path, []):
groups = self.groupby(attr, root, func=fn, flatten=fl)
for group_key, children in groups.items():
obj = GroupBySource(root, attr, group_key, children,
template=temp, slug=slug, dependency=dep)
self.track_dev_server_path(obj)
yield obj
def process(self, record: Record) -> None:
'''
Will iterate over all record fields and call the callback method.
Each record is guaranteed to be processed only once.
'''
if record in self._state:
return
tmp = {}
for key, field in self._model_reader.read(record, self.flatten):
for ret in self.callback(GroupByCallbackArgs(record, key, field)):
assert isinstance(ret, (tuple, list)), \
'Must return tuple (group-key, extra-info)'
group_key, extra = ret
if group_key not in tmp:
tmp[group_key] = [extra]
else:
tmp[group_key].append(extra)
self._state.add(record, tmp)
# ------------------
# Path resolving
# ------------------
def iter_sources(self, root: Record) -> Iterator[GroupBySource]:
''' Prepare and yield GroupBySource elements. '''
for group_key, children in self._state.items():
src = GroupBySource(root, self.attrib, group_key,
slug=self.slug, template=self.template)
src.dependencies = self.dependencies
src.children = children
yield src
def resolve_virtual_path(
self, node: SourceObject, pieces: List[str]
) -> Optional[GroupBySource]:
''' Given a @VPATH/attrib/groupkey path, determine url path. '''
if len(pieces) >= 2:
attrib: AttributeKey = pieces[0] # type: ignore[assignment]
group: GroupKey = pieces[1] # type: ignore[assignment]
for attr, _, _, _, slug, _ in self._func.get(node.url_path, []):
if attr == attrib:
# TODO: do we need to provide the template too?
return GroupBySource(node, attr, group, slug=slug)
return None
def __str__(self) -> str:
txt = '<GroupByWatcher'
for x in [
'root', 'attrib', 'slug', 'template', 'flatten', 'dependencies'
]:
txt += ' {}="{}"'.format(x, getattr(self, x))
return txt + '>'
def track_dev_server_path(self, sender: GroupBySource) -> None:
''' Dev server only: Add target path to reverse artifact url lookup '''
self._resolve_map[sender.url_path] = sender
# -----------------------------------
# Main Component
# -----------------------------------
class GroupByCreator:
'''
Process all children with matching conditions under specified page.
Creates a grouping of pages with similar (self-defined) attributes.
The grouping is performed only once per build.
'''
def __init__(self) -> None:
self._watcher = [] # type: List[GroupByWatcher]
self._results = {} # type: Dict[str, GroupBySource]
self._resolve_map = {} # type: Dict[str, ResolverConf]
# ----------------
# Add Observer
# ----------------
def depends_on(self, *args: str) \
-> Callable[[GroupByWatcher], GroupByWatcher]:
''' Set GroupBySource dependency, e.g., a plugin config file. '''
def _decorator(r: GroupByWatcher) -> GroupByWatcher:
r.dependencies.update(list(args))
return r
return _decorator
def watch(
self,
root: str,
attrib: AttributeKey, *,
slug: Optional[str] = None, # default: "{attrib}/{group}/index.html"
template: Optional[str] = None, # default: "groupby-attrib.html"
flatten: bool = True, # if False, dont explode FlowType
) -> Callable[[GroupingCallback], GroupByWatcher]:
'''
Decorator to subscribe to attrib-elements.
(record, field-key, field) -> (group-key, extra-info)
:DEFAULTS:
slug: "{attrib}/{group}/index.html"
template: "groupby-attrib.html"
'''
root = root.rstrip('/') + '/'
def _decorator(fn: GroupingCallback) -> GroupByWatcher:
w = GroupByWatcher(root, attrib, fn, slug=slug, template=template)
w.flatten = flatten
self._watcher.append(w)
return w
return _decorator
# -----------
# Builder
# -----------
def clear_previous_results(self) -> None:
''' Reset prvious results. Must be called before each build. '''
self._watcher.clear()
self._results.clear()
self._resolve_map.clear()
def make_cluster(self, builder: Builder) -> None:
''' Perform groupby, iterate over all children. '''
if not self._watcher:
return
for w in self._watcher:
w.initialize(builder.pad.db)
queue = builder.pad.get_all_roots() # type: List[SourceObject]
while queue:
record = queue.pop()
self.queue_now(record)
if hasattr(record, 'attachments'):
queue.extend(record.attachments) # type: ignore[attr-defined]
if hasattr(record, 'children'):
queue.extend(record.children) # type: ignore[attr-defined]
# build artifacts
for w in self._watcher:
root = builder.pad.get(w.root)
for vobj in w.iter_sources(root):
self._results[vobj.url_path] = vobj
self._watcher.clear()
def queue_now(self, node: SourceObject) -> None:
''' Process record immediatelly (No-Op if already processed). '''
for w in self._watcher:
if w.should_process(node): # ensures type Record
w.process(node) # type: ignore[arg-type]
def build_all(self, builder: Builder) -> None:
''' Create virtual objects and build sources. '''
for url, x in sorted(self._results.items()):
builder.build(x)
self._resolve_map[url] = ResolverConf(x.attrib, x.group, x.slug)
self._results.clear()
# -----------------
# Path resolver
# -----------------
def resolve_dev_server_path(
self, node: SourceObject, pieces: List[str]
) -> Optional[GroupBySource]:
''' Dev server only: Resolve actual url to virtual obj. '''
return self._resolve_map.get(build_url([node.url_path] + pieces))
''' Dev server only: Resolves path/ -> path/index.html '''
if not isinstance(node, Record):
return None
conf = self._resolve_map.get(build_url([node.url_path] + pieces))
if not conf:
return None
return GroupBySource(node, conf.attrib, conf.group, slug=conf.slug)
# -----------------------------------
@@ -410,31 +446,17 @@ class GroupByPlugin(Plugin):
name = 'GroupBy Plugin'
description = 'Cluster arbitrary records with field attribute keyword.'
def on_setup_env(self, **extra):
def on_setup_env(self, **extra: object) -> None:
self.creator = GroupByCreator()
self.env.add_build_program(GroupBySource, GroupByBuildProgram)
# let other plugins register their @groupby.watch functions
self.emit('init', groupby=self.creator, **extra)
# resolve /tag/rss/ -> /tag/rss/index.html (local server only)
@self.env.urlresolver
def groupby_path_resolver(node, pieces):
if self.creator.should_process(node):
return self.creator.resolve_dev_server_path(node, pieces)
def _(node: SourceObject, parts: List[str]) -> Optional[GroupBySource]:
return self.creator.resolve_dev_server_path(node, parts)
# use VPATH in templates: {{ '/@groupby/attrib/group' | url }}
@self.env.virtualpathresolver(VPATH.lstrip('@'))
def groupby_virtualpath_resolver(node, pieces):
if self.creator.should_process(node):
return self.creator.resolve_virtual_path(node, pieces)
# injection to generate GroupBy nodes when processing artifacts
# @self.env.generator
# def groupby_generator(node):
# if self.creator.should_process(node):
# yield from self.creator.make_cluster(node)
def _quick_config(self):
def _load_quick_config(self) -> None:
''' Load config file quick listeners. '''
config = self.get_config()
for attrib in config.sections():
sect = config.section_as_dict(attrib)
@@ -443,9 +465,10 @@ class GroupByPlugin(Plugin):
temp = sect.get('template')
split = sect.get('split')
@self.creator.watch_once(root, attrib, template=temp, slug=slug,
dependency=self.config_filename)
def _fn(args):
@self.creator.depends_on(self.config_filename)
@self.creator.watch(root, attrib, slug=slug, template=temp)
def _fn(args: GroupByCallbackArgs) \
-> Iterator[Tuple[GroupKey, object]]:
val = args.field
if isinstance(val, str):
val = val.split(split) if split else [val] # make list
@@ -453,26 +476,23 @@ class GroupByPlugin(Plugin):
for tag in val:
yield slugify(tag), tag
def on_before_build_all(self, builder, **extra):
# let other plugins register their @groupby.watch_once functions
self.emit('init-once', groupby=self.creator, builder=builder, **extra)
# load config file quick listeners (before initialize!)
self._quick_config()
# parse all models to detect attribs of listeners
self.creator.initialize(builder.pad.db)
def on_before_build_all(self, builder: Builder, **extra: object) -> None:
self.creator.clear_previous_results()
# let other plugins register their @groupby.watch functions
self.emit('before-build-all', groupby=self.creator, builder=builder)
self.creator.make_cluster(builder)
def on_before_build(self, builder, build_state, source, prog, **extra):
# Injection to create GroupBy nodes before parent node is built.
# Use this callback (not @generator) to modify parent beforehand.
# Relevant for the root page which is otherwise build before GroupBy.
if self.creator.should_process(source):
for vobj in self.creator.make_cluster(source):
builder.build(vobj)
def on_before_build(self, source: SourceObject, **extra: object) -> None:
# before-build may be called before before-build-all (issue #1017)
# make sure it is evaluated immediatelly
self.creator.queue_now(source)
def on_after_build_all(self, builder, **extra):
# remove all quick listeners (will be added again in the next build)
self.creator.remove_watch_once()
def on_after_build_all(self, builder: Builder, **extra: object) -> None:
self.emit('after-build-all', groupby=self.creator, builder=builder)
self._load_quick_config()
self.creator.make_cluster(builder)
self.creator.build_all(builder)
def on_after_prune(self, builder, **extra):
# TODO: find better way to prune unreferenced elements
def on_after_prune(self, builder: Builder, **extra: object) -> None:
# TODO: find a better way to prune unreferenced elements
GroupByPruner.prune(builder)