efficient build

- postpone building until really needed
- rebuild only if artifacts change
- no build on source update
- prune takes current resolver state instead of global var
This commit is contained in:
relikd
2022-04-13 15:41:57 +02:00
parent 8ae5376d41
commit 1d9629566c
8 changed files with 302 additions and 270 deletions

59
lektor_groupby/backref.py Normal file
View File

@@ -0,0 +1,59 @@
from lektor.context import get_ctx
from typing import TYPE_CHECKING, Iterator
from weakref import WeakSet
if TYPE_CHECKING:
from lektor.builder import Builder
from lektor.db import Record
from .groupby import GroupBy
from .vobj import GroupBySource
class GroupByRef:
@staticmethod
def of(builder: 'Builder') -> 'GroupBy':
''' Get the GroupBy object of a builder. '''
return builder.__groupby # type:ignore[attr-defined,no-any-return]
@staticmethod
def set(builder: 'Builder', groupby: 'GroupBy') -> None:
''' Set the GroupBy object of a builder. '''
builder.__groupby = groupby # type: ignore[attr-defined]
class VGroups:
@staticmethod
def of(record: 'Record') -> WeakSet:
'''
Return the (weak) set of virtual objects of a page.
Creates a new set if it does not exist yet.
'''
try:
wset = record.__vgroups # type: ignore[attr-defined]
except AttributeError:
wset = WeakSet()
record.__vgroups = wset # type: ignore[attr-defined]
return wset # type: ignore[no-any-return]
@staticmethod
def iter(record: 'Record', *keys: str, recursive: bool = False) \
-> Iterator['GroupBySource']:
''' Extract all referencing groupby virtual objects from a page. '''
ctx = get_ctx()
if not ctx:
raise NotImplementedError("Shouldn't happen, where is my context?")
# get GroupBy object
builder = ctx.build_state.builder
groupby = GroupByRef.of(builder)
groupby.make_once(builder) # ensure did cluster before
# manage config dependencies
for dep in groupby.dependencies:
ctx.record_dependency(dep)
# find groups
proc_list = [record]
while proc_list:
page = proc_list.pop(0)
if recursive and hasattr(page, 'children'):
proc_list.extend(page.children) # type: ignore[attr-defined]
for vobj in VGroups.of(page):
if not keys or vobj.config.key in keys:
yield vobj

View File

@@ -1,12 +1,14 @@
from lektor.builder import Builder, PathCache
from lektor.db import Record # typing
from lektor.sourceobj import SourceObject # typing
from typing import Set, List
from .vobj import GroupBySource # typing
from .config import Config, AnyConfig
from .resolver import Resolver # typing
from lektor.builder import PathCache
from lektor.db import Record # isinstance
from typing import TYPE_CHECKING, Set, List
from .config import Config
from .watcher import Watcher
if TYPE_CHECKING:
from .config import AnyConfig
from lektor.builder import Builder
from lektor.sourceobj import SourceObject
from .resolver import Resolver
from .vobj import GroupBySource
class GroupBy:
@@ -16,11 +18,12 @@ class GroupBy:
The grouping is performed only once per build.
'''
def __init__(self) -> None:
def __init__(self, resolver: 'Resolver') -> None:
self._watcher = [] # type: List[Watcher]
self._results = [] # type: List[GroupBySource]
self.resolver = resolver
def add_watcher(self, key: str, config: AnyConfig) -> Watcher:
def add_watcher(self, key: str, config: 'AnyConfig') -> Watcher:
''' Init Config and add to watch list. '''
w = Watcher(Config.from_any(key, config))
self._watcher.append(w)
@@ -32,8 +35,9 @@ class GroupBy:
deps.update(w.config.dependencies)
return deps
def queue_all(self, builder: Builder) -> None:
def queue_all(self, builder: 'Builder') -> None:
''' Iterate full site-tree and queue all children. '''
self.dependencies = self.get_dependencies()
# remove disabled watchers
self._watcher = [w for w in self._watcher if w.config.enabled]
if not self._watcher:
@@ -45,30 +49,29 @@ class GroupBy:
queue = builder.pad.get_all_roots() # type: List[SourceObject]
while queue:
record = queue.pop()
self.queue_now(record)
if hasattr(record, 'attachments'):
queue.extend(record.attachments) # type: ignore[attr-defined]
if hasattr(record, 'children'):
queue.extend(record.children) # type: ignore[attr-defined]
if isinstance(record, Record):
for w in self._watcher:
if w.should_process(record):
w.process(record)
def queue_now(self, node: SourceObject) -> None:
''' Process record immediatelly (No-Op if already processed). '''
if isinstance(node, Record):
for w in self._watcher:
if w.should_process(node):
w.process(node)
def make_cluster(self, builder: Builder, resolver: Resolver) -> None:
def make_once(self, builder: 'Builder') -> None:
''' Perform groupby, iter over sources with watcher callback. '''
for w in self._watcher:
root = builder.pad.get(w.config.root)
for vobj in w.iter_sources(root):
self._results.append(vobj)
resolver.add(vobj)
self._watcher.clear()
if self._watcher:
self.resolver.reset()
for w in self._watcher:
root = builder.pad.get(w.config.root)
for vobj in w.iter_sources(root):
self._results.append(vobj)
self.resolver.add(vobj)
self._watcher.clear()
def build_all(self, builder: Builder) -> None:
def build_all(self, builder: 'Builder') -> None:
''' Create virtual objects and build sources. '''
self.make_once(builder) # in case no page used the |vgroups filter
path_cache = PathCache(builder.env)
for vobj in self._results:
if vobj.slug:

66
lektor_groupby/model.py Normal file
View File

@@ -0,0 +1,66 @@
from lektor.db import Database, Record # typing
from lektor.types.flow import Flow, FlowType
from lektor.utils import bool_from_string
from typing import Set, Dict, Tuple, Any, NamedTuple, Optional, Iterator
class FieldKeyPath(NamedTuple):
fieldKey: str
flowIndex: Optional[int] = None
flowKey: Optional[str] = None
class ModelReader:
'''
Find models and flow-models which contain attribute.
Flows are either returned directly (flatten=False) or
expanded so that each flow-block is yielded (flatten=True)
'''
def __init__(self, db: Database, attr: str, flatten: bool = False) -> None:
self.flatten = flatten
self._flows = {} # type: Dict[str, Set[str]]
self._models = {} # type: Dict[str, Dict[str, str]]
# find flow blocks containing attribute
for key, flow in db.flowblocks.items():
tmp1 = set(f.name for f in flow.fields
if bool_from_string(f.options.get(attr, False)))
if tmp1:
self._flows[key] = tmp1
# find models and flow-blocks containing attribute
for key, model in db.datamodels.items():
tmp2 = {} # Dict[str, str]
for field in model.fields:
if bool_from_string(field.options.get(attr, False)):
tmp2[field.name] = '*' # include all children
elif isinstance(field.type, FlowType) and self._flows:
# only processed if at least one flow has attr
fbs = field.type.flow_blocks
# if fbs == None, all flow-blocks are allowed
if fbs is None or any(x in self._flows for x in fbs):
tmp2[field.name] = '?' # only some flow blocks
if tmp2:
self._models[key] = tmp2
def read(self, record: Record) -> Iterator[Tuple[FieldKeyPath, Any]]:
''' Enumerate all fields of a Record with attrib = True. '''
assert isinstance(record, Record)
for r_key, subs in self._models.get(record.datamodel.id, {}).items():
field = record[r_key]
if not field:
continue
if subs == '*': # either normal field or flow type (all blocks)
if self.flatten and isinstance(field, Flow):
for i, flow in enumerate(field.blocks):
flowtype = flow['_flowblock']
for f_key, block in flow._data.items():
if f_key.startswith('_'): # e.g., _flowblock
continue
yield FieldKeyPath(r_key, i, f_key), block
else:
yield FieldKeyPath(r_key), field
else: # always flow type (only some blocks)
for i, flow in enumerate(field.blocks):
flowtype = flow['_flowblock']
for f_key in self._flows.get(flowtype, []):
yield FieldKeyPath(r_key, i, f_key), flow[f_key]

View File

@@ -1,14 +1,15 @@
from lektor.builder import Builder # typing
from lektor.db import Page # typing
from lektor.db import Page # isinstance
from lektor.pluginsystem import Plugin # subclass
from lektor.sourceobj import SourceObject # typing
from typing import Iterator, Any
from .vobj import GroupBySource, GroupByBuildProgram, VPATH, VGroups
from typing import TYPE_CHECKING, Iterator, Any
from .backref import GroupByRef, VGroups
from .groupby import GroupBy
from .pruner import prune
from .resolver import Resolver
from .watcher import GroupByCallbackArgs # typing
from .vobj import VPATH, GroupBySource, GroupByBuildProgram
if TYPE_CHECKING:
from lektor.builder import Builder, BuildState
from lektor.sourceobj import SourceObject
from .watcher import GroupByCallbackArgs
class GroupByPlugin(Plugin):
@@ -16,10 +17,51 @@ class GroupByPlugin(Plugin):
description = 'Cluster arbitrary records with field attribute keyword.'
def on_setup_env(self, **extra: Any) -> None:
self.has_changes = False
self.resolver = Resolver(self.env)
self.env.add_build_program(GroupBySource, GroupByBuildProgram)
self.env.jinja_env.filters.update(vgroups=VGroups.iter)
def on_before_build(
self, builder: 'Builder', source: 'SourceObject', **extra: Any
) -> None:
# before-build may be called before before-build-all (issue #1017)
# make sure it is always evaluated first
if isinstance(source, Page):
self._init_once(builder)
def on_after_build(self, build_state: 'BuildState', **extra: Any) -> None:
if build_state.updated_artifacts:
self.has_changes = True
def on_after_build_all(self, builder: 'Builder', **extra: Any) -> None:
# only rebuild if has changes (bypass idle builds)
# or the very first time after startup (url resolver & pruning)
if self.has_changes or not self.resolver.has_any:
self._init_once(builder).build_all(builder) # updates resolver
self.has_changes = False
def on_after_prune(self, builder: 'Builder', **extra: Any) -> None:
# TODO: find a better way to prune unreferenced elements
prune(builder, VPATH, self.resolver.files)
# ------------
# internal
# ------------
def _init_once(self, builder: 'Builder') -> GroupBy:
try:
return GroupByRef.of(builder)
except AttributeError:
groupby = GroupBy(self.resolver)
GroupByRef.set(builder, groupby)
self._load_quick_config(groupby)
# let other plugins register their @groupby.watch functions
self.emit('before-build-all', groupby=groupby, builder=builder)
groupby.queue_all(builder)
return groupby
def _load_quick_config(self, groupby: GroupBy) -> None:
''' Load config file quick listeners. '''
config = self.get_config()
@@ -31,39 +73,9 @@ class GroupByPlugin(Plugin):
split = config.get(key + '.split') # type: str
@watcher.grouping()
def _fn(args: GroupByCallbackArgs) -> Iterator[str]:
def _fn(args: 'GroupByCallbackArgs') -> Iterator[str]:
val = args.field
if isinstance(val, str):
val = map(str.strip, val.split(split)) if split else [val]
if isinstance(val, (list, map)):
yield from val
def _init_once(self, builder: Builder) -> GroupBy:
try:
return builder.__groupby # type:ignore[attr-defined,no-any-return]
except AttributeError:
groupby = GroupBy()
builder.__groupby = groupby # type: ignore[attr-defined]
self.resolver.reset()
self._load_quick_config(groupby)
# let other plugins register their @groupby.watch functions
self.emit('before-build-all', groupby=groupby, builder=builder)
self.config_dependencies = groupby.get_dependencies()
groupby.queue_all(builder)
groupby.make_cluster(builder, self.resolver)
return groupby
def on_before_build(self, builder: Builder, source: SourceObject,
**extra: Any) -> None:
# before-build may be called before before-build-all (issue #1017)
# make sure it is evaluated immediatelly
if isinstance(source, Page):
self._init_once(builder)
def on_after_build_all(self, builder: Builder, **extra: object) -> None:
self._init_once(builder).build_all(builder)
def on_after_prune(self, builder: Builder, **extra: object) -> None:
# TODO: find a better way to prune unreferenced elements
prune(builder, VPATH)

View File

@@ -2,29 +2,36 @@
Static collector for build-artifact urls.
All non-tracked VPATH-urls will be pruned after build.
'''
from lektor.builder import Builder # typing
from lektor.reporter import reporter # report_pruned_artifact
from lektor.utils import prune_file_and_folder
_cache = set()
# Note: this var is static or otherwise two instances of
# this module would prune each others artifacts.
from typing import TYPE_CHECKING, Set, Iterable
if TYPE_CHECKING:
from lektor.builder import Builder
def track_not_prune(url: str) -> None:
''' Add url to build cache to prevent pruning. '''
_cache.add(url.lstrip('/'))
def _normalize_url_cache(url_cache: Iterable[str]) -> Set[str]:
cache = set()
for url in url_cache:
if url.endswith('/'):
url += 'index.html'
cache.add(url.lstrip('/'))
return cache
def prune(builder: Builder, vpath: str) -> None:
''' Remove previously generated, unreferenced Artifacts. '''
def prune(builder: 'Builder', vpath: str, url_cache: Iterable[str]) -> None:
'''
Remove previously generated, unreferenced Artifacts.
All urls in url_cache must have a trailing "/index.html" (instead of "/")
and also, no leading slash, "blog/index.html" instead of "/blog/index.html"
'''
vpath = '@' + vpath.lstrip('@') # just in case of user error
dest_path = builder.destination_path
url_cache = _normalize_url_cache(url_cache)
con = builder.connect_to_database()
try:
with builder.new_build_state() as build_state:
for url, file in build_state.iter_artifacts():
if url.lstrip('/') in _cache:
if url.lstrip('/') in url_cache:
continue # generated in this build-run
infos = build_state.get_artifact_dependency_infos(url, [])
for artifact_name, _ in infos:
@@ -36,4 +43,3 @@ def prune(builder: Builder, vpath: str) -> None:
break # there is only one VPATH-entry per source
finally:
con.close()
_cache.clear()

View File

@@ -1,11 +1,11 @@
from lektor.db import Record
from lektor.environment import Environment
from lektor.sourceobj import SourceObject
from lektor.db import Record # isinstance
from lektor.utils import build_url
from typing import Dict, List, Tuple, Optional
from .config import Config # typing
from .vobj import GroupBySource, VPATH
from typing import TYPE_CHECKING, Dict, List, Tuple, Optional, Iterable
from .vobj import VPATH, GroupBySource
if TYPE_CHECKING:
from lektor.environment import Environment
from lektor.sourceobj import SourceObject
from .config import Config
class Resolver:
@@ -14,31 +14,18 @@ class Resolver:
Init will subscribe to @urlresolver and @virtualpathresolver.
'''
def __init__(self, env: Environment) -> None:
def __init__(self, env: 'Environment') -> None:
self._data = {} # type: Dict[str, Tuple[str, Config]]
env.urlresolver(self.resolve_server_path)
env.virtualpathresolver(VPATH.lstrip('@'))(self.resolve_virtual_path)
# Local server only: resolve /tag/rss/ -> /tag/rss/index.html
@env.urlresolver
def dev_server_path(node: SourceObject, pieces: List[str]) \
-> Optional[GroupBySource]:
if isinstance(node, Record):
rv = self._data.get(build_url([node.url_path] + pieces))
if rv:
return GroupBySource(node, group=rv[0], config=rv[1])
return None
@property
def has_any(self) -> bool:
return bool(self._data)
# Admin UI only: Prevent server error and null-redirect.
@env.virtualpathresolver(VPATH.lstrip('@'))
def virtual_path(node: SourceObject, pieces: List[str]) \
-> Optional[GroupBySource]:
if isinstance(node, Record) and len(pieces) >= 2:
path = node['_path'] # type: str
key, grp, *_ = pieces
for group, conf in self._data.values():
if key == conf.key and path == conf.root:
if conf.slugify(group) == grp:
return GroupBySource(node, group, conf)
return None
@property
def files(self) -> Iterable[str]:
return self._data
def reset(self) -> None:
''' Clear previously recorded virtual objects. '''
@@ -48,3 +35,28 @@ class Resolver:
''' Track new virtual object (only if slug is set). '''
if vobj.slug:
self._data[vobj.url_path] = (vobj.group, vobj.config)
# ------------
# Resolver
# ------------
def resolve_server_path(self, node: 'SourceObject', pieces: List[str]) \
-> Optional[GroupBySource]:
''' Local server only: resolve /tag/rss/ -> /tag/rss/index.html '''
if isinstance(node, Record):
rv = self._data.get(build_url([node.url_path] + pieces))
if rv:
return GroupBySource(node, group=rv[0], config=rv[1])
return None
def resolve_virtual_path(self, node: 'SourceObject', pieces: List[str]) \
-> Optional[GroupBySource]:
''' Admin UI only: Prevent server error and null-redirect. '''
if isinstance(node, Record) and len(pieces) >= 2:
path = node['_path'] # type: str
key, grp, *_ = pieces
for group, conf in self._data.values():
if key == conf.key and path == conf.root:
if conf.slugify(group) == grp:
return GroupBySource(node, group, conf)
return None

View File

@@ -1,16 +1,15 @@
from lektor.build_programs import BuildProgram # subclass
from lektor.builder import Artifact # typing
from lektor.context import get_ctx
from lektor.db import Record # typing
from lektor.environment import Expression
from lektor.sourceobj import VirtualSourceObject # subclass
from lektor.utils import build_url
from typing import Dict, List, Any, Optional, Iterator
from weakref import WeakSet
from .config import Config
from .pruner import track_not_prune
from typing import TYPE_CHECKING, Dict, List, Any, Optional, Iterator
from .backref import VGroups
from .util import report_config_error
if TYPE_CHECKING:
from lektor.builder import Artifact
from lektor.db import Record
from .config import Config
VPATH = '@groupby' # potentially unsafe. All matching entries are pruned.
@@ -28,15 +27,16 @@ class GroupBySource(VirtualSourceObject):
def __init__(
self,
record: Record,
record: 'Record',
group: str,
config: Config,
children: Optional[Dict[Record, List[Any]]] = None,
config: 'Config',
children: Optional[Dict['Record', List[Any]]] = None,
) -> None:
super().__init__(record)
self.key = config.slugify(group)
self.group = group
self.config = config
self._children = children or {} # type: Dict[Record, List[Any]]
# evaluate slug Expression
if config.slug and '{key}' in config.slug:
self.slug = config.slug.replace('{key}', self.key)
@@ -45,16 +45,12 @@ class GroupBySource(VirtualSourceObject):
assert self.slug != Ellipsis, 'invalid config: ' + config.slug
if self.slug and self.slug.endswith('/index.html'):
self.slug = self.slug[:-10]
# make sure children are on the same pad
self._children = {} # type: Dict[Record, List[Any]]
for child, extras in (children or {}).items():
if child.pad != record.pad:
child = record.pad.get(child.path)
self._children[child] = extras
VGroups.of(child).add(self)
# extra fields
for attr, expr in config.fields.items():
setattr(self, attr, self._eval(expr, field='fields.' + attr))
# back-ref
for child in self._children:
VGroups.of(child).add(self)
def _eval(self, value: Any, *, field: str) -> Any:
''' Internal only: evaluates Lektor config file field expression. '''
@@ -94,12 +90,12 @@ class GroupBySource(VirtualSourceObject):
# -----------------------
@property
def children(self) -> Dict[Record, List[Any]]:
def children(self) -> Dict['Record', List[Any]]:
''' Returns dict with page record key and (optional) extra value. '''
return self._children
@property
def first_child(self) -> Optional[Record]:
def first_child(self) -> Optional['Record']:
''' Returns first referencing page record. '''
if self._children:
return iter(self._children).__next__()
@@ -139,44 +135,6 @@ class GroupBySource(VirtualSourceObject):
self.path, len(self._children))
# -----------------------------------
# Reverse Reference
# -----------------------------------
class VGroups:
@staticmethod
def of(record: Record) -> WeakSet:
try:
wset = record.__vgroups # type: ignore[attr-defined]
except AttributeError:
wset = WeakSet()
record.__vgroups = wset # type: ignore[attr-defined]
return wset # type: ignore[no-any-return]
@staticmethod
def iter(
record: Record,
*keys: str,
recursive: bool = False
) -> Iterator[GroupBySource]:
''' Extract all referencing groupby virtual objects from a page. '''
ctx = get_ctx()
# manage dependencies
if ctx:
for dep in ctx.env.plugins['groupby'].config_dependencies:
ctx.record_dependency(dep)
# find groups
proc_list = [record]
while proc_list:
page = proc_list.pop(0)
if recursive and hasattr(page, 'children'):
proc_list.extend(page.children) # type: ignore[attr-defined]
for vobj in VGroups.of(page):
vobj.config.dependencies
if not keys or vobj.config.key in keys:
yield vobj
# -----------------------------------
# BuildProgram
# -----------------------------------
@@ -190,9 +148,8 @@ class GroupByBuildProgram(BuildProgram):
url += 'index.html'
self.declare_artifact(url, sources=list(
self.source.iter_source_filenames()))
track_not_prune(url)
def build_artifact(self, artifact: Artifact) -> None:
def build_artifact(self, artifact: 'Artifact') -> None:
get_ctx().record_virtual_dependency(self.source)
artifact.render_template_into(
self.source.config.template, this=self.source)

View File

@@ -1,27 +1,17 @@
from lektor.db import Database, Record # typing
from lektor.types.flow import Flow, FlowType
from lektor.utils import bool_from_string
from typing import Set, Dict, List, Tuple, Any, Union, NamedTuple
from typing import TYPE_CHECKING, Dict, List, Tuple, Any, Union, NamedTuple
from typing import Optional, Callable, Iterator, Generator
from .vobj import GroupBySource
from .config import Config
from .model import ModelReader
from .util import most_used_key
# -----------------------------------
# Typing
# -----------------------------------
class FieldKeyPath(NamedTuple):
fieldKey: str
flowIndex: Optional[int] = None
flowKey: Optional[str] = None
from .vobj import GroupBySource
if TYPE_CHECKING:
from lektor.db import Database, Record
from .config import Config
from .model import FieldKeyPath
class GroupByCallbackArgs(NamedTuple):
record: Record
key: FieldKeyPath
record: 'Record'
key: 'FieldKeyPath'
field: Any # lektor model data-field value
@@ -31,83 +21,15 @@ GroupingCallback = Callable[[GroupByCallbackArgs], Union[
]]
# -----------------------------------
# ModelReader
# -----------------------------------
class GroupByModelReader:
''' Find models and flow-models which contain attribute '''
def __init__(self, db: Database, attrib: str) -> None:
self._flows = {} # type: Dict[str, Set[str]]
self._models = {} # type: Dict[str, Dict[str, str]]
# find flow blocks containing attribute
for key, flow in db.flowblocks.items():
tmp1 = set(f.name for f in flow.fields
if bool_from_string(f.options.get(attrib, False)))
if tmp1:
self._flows[key] = tmp1
# find models and flow-blocks containing attribute
for key, model in db.datamodels.items():
tmp2 = {} # Dict[str, str]
for field in model.fields:
if bool_from_string(field.options.get(attrib, False)):
tmp2[field.name] = '*' # include all children
elif isinstance(field.type, FlowType) and self._flows:
# only processed if at least one flow has attrib
fbs = field.type.flow_blocks
# if fbs == None, all flow-blocks are allowed
if fbs is None or any(x in self._flows for x in fbs):
tmp2[field.name] = '?' # only some flow blocks
if tmp2:
self._models[key] = tmp2
def read(
self,
record: Record,
flatten: bool = False
) -> Iterator[Tuple[FieldKeyPath, Any]]:
'''
Enumerate all fields of a Record with attrib = True.
Flows are either returned directly (flatten=False) or
expanded so that each flow-block is yielded (flatten=True)
'''
assert isinstance(record, Record)
for r_key, subs in self._models.get(record.datamodel.id, {}).items():
field = record[r_key]
if not field:
continue
if subs == '*': # either normal field or flow type (all blocks)
if flatten and isinstance(field, Flow):
for i, flow in enumerate(field.blocks):
flowtype = flow['_flowblock']
for f_key, block in flow._data.items():
if f_key.startswith('_'): # e.g., _flowblock
continue
yield FieldKeyPath(r_key, i, f_key), block
else:
yield FieldKeyPath(r_key), field
else: # always flow type (only some blocks)
for i, flow in enumerate(field.blocks):
flowtype = flow['_flowblock']
for f_key in self._flows.get(flowtype, []):
yield FieldKeyPath(r_key, i, f_key), flow[f_key]
# -----------------------------------
# Watcher
# -----------------------------------
class Watcher:
'''
Callback is called with (Record, FieldKeyPath, field-value).
Callback may yield one or more (group, extra-info) tuples.
'''
def __init__(self, config: Config) -> None:
def __init__(self, config: 'Config') -> None:
self.config = config
self.flatten = True
self.callback = None # type: GroupingCallback #type:ignore[assignment]
self._root = self.config.root
def grouping(self, flatten: bool = True) \
-> Callable[[GroupingCallback], None]:
@@ -122,28 +44,23 @@ class Watcher:
self.callback = fn
return _decorator
def initialize(self, db: Database) -> None:
def initialize(self, db: 'Database') -> None:
''' Reset internal state. You must initialize before each build! '''
assert callable(self.callback), 'No grouping callback provided.'
self._root = self.config.root
self._model_reader = GroupByModelReader(db, attrib=self.config.key)
self._model_reader = ModelReader(db, self.config.key, self.flatten)
self._state = {} # type: Dict[str, Dict[Record, List[Any]]]
self._group_map = {} # type: Dict[str, List[str]]
self._processed = set() # type: Set[str]
def should_process(self, node: Record) -> bool:
def should_process(self, node: 'Record') -> bool:
''' Check if record path is being watched. '''
return node['_path'].startswith(self._root)
def process(self, record: Record) -> None:
def process(self, record: 'Record') -> None:
'''
Will iterate over all record fields and call the callback method.
Each record is guaranteed to be processed only once.
'''
if record.path in self._processed:
return
self._processed.add(record.path)
for key, field in self._model_reader.read(record, self.flatten):
for key, field in self._model_reader.read(record):
_gen = self.callback(GroupByCallbackArgs(record, key, field))
try:
obj = next(_gen)
@@ -161,10 +78,11 @@ class Watcher:
def _persist(
self,
record: Record,
key: FieldKeyPath,
record: 'Record',
key: 'FieldKeyPath',
obj: Union[str, tuple]
) -> str:
''' Update internal state. Return slugified string. '''
group = obj if isinstance(obj, str) else obj[0]
slug = self.config.slugify(group)
# init group-key
@@ -176,14 +94,14 @@ class Watcher:
# init group extras
if record not in self._state[slug]:
self._state[slug][record] = []
# (optional) append extra
# append extras (or default value)
if isinstance(obj, tuple):
self._state[slug][record].append(obj[1])
else:
self._state[slug][record].append(key.flowKey or key.fieldKey)
self._state[slug][record].append(key.fieldKey)
return slug
def iter_sources(self, root: Record) -> Iterator[GroupBySource]:
def iter_sources(self, root: 'Record') -> Iterator[GroupBySource]:
''' Prepare and yield GroupBySource elements. '''
for key, children in self._state.items():
group = most_used_key(self._group_map[key])
@@ -192,7 +110,6 @@ class Watcher:
del self._model_reader
del self._state
del self._group_map
del self._processed
def __repr__(self) -> str:
return '<GroupByWatcher key="{}" enabled={} callback={}>'.format(