remove ConfigKey and GroupKey types

This commit is contained in:
relikd
2022-04-06 00:29:40 +02:00
parent adb26e343e
commit ebc29459ec
6 changed files with 48 additions and 46 deletions

View File

@@ -1 +1,4 @@
from .config import Config # noqa: F401
from .groupby import GroupBy # noqa: F401
from .plugin import GroupByPlugin # noqa: F401
from .watcher import GroupByCallbackArgs # noqa: F401

View File

@@ -1,9 +1,8 @@
from inifile import IniFile
from lektor.utils import slugify
from typing import NewType, Set, Dict, Optional, Union
from typing import Set, Dict, Optional, Union
ConfigKey = NewType('ConfigKey', str) # attribute of lektor model
AnyConfig = Union['Config', IniFile, Dict]
@@ -18,14 +17,14 @@ class Config:
def __init__(
self,
key: ConfigKey, *,
key: str, *,
root: Optional[str] = None, # default: "/"
slug: Optional[str] = None, # default: "{attr}/{group}/index.html"
template: Optional[str] = None, # default: "groupby-{attr}.html"
) -> None:
self.key = key
self.root = (root or '/').rstrip('/') + '/'
self.slug = slug or f'"{key}/" ~ this.key ~ "/"' # this: GroupBySource
self.slug = slug or (key + '/{key}/') # key = GroupBySource.key
self.template = template or f'groupby-{self.key}.html'
# editable after init
self.enabled = True
@@ -57,7 +56,7 @@ class Config:
return txt + '>'
@staticmethod
def from_dict(key: ConfigKey, cfg: Dict[str, str]) -> 'Config':
def from_dict(key: str, cfg: Dict[str, str]) -> 'Config':
''' Set config fields manually. Allowed: key, root, slug, template. '''
return Config(
key=key,
@@ -67,7 +66,7 @@ class Config:
)
@staticmethod
def from_ini(key: ConfigKey, ini: IniFile) -> 'Config':
def from_ini(key: str, ini: IniFile) -> 'Config':
''' Read and parse ini file. Also adds dependency tracking. '''
cfg = ini.section_as_dict(key) # type: Dict[str, str]
conf = Config.from_dict(key, cfg)
@@ -78,7 +77,7 @@ class Config:
return conf
@staticmethod
def from_any(key: ConfigKey, config: AnyConfig) -> 'Config':
def from_any(key: str, config: AnyConfig) -> 'Config':
assert isinstance(config, (Config, IniFile, Dict))
if isinstance(config, Config):
return config

View File

@@ -4,8 +4,8 @@ from lektor.sourceobj import SourceObject
from lektor.utils import build_url
from typing import Set, Dict, List, Optional, Tuple
from .vobj import GroupBySource, GroupKey
from .config import Config, ConfigKey, AnyConfig
from .vobj import GroupBySource
from .config import Config, AnyConfig
from .watcher import Watcher
@@ -19,13 +19,13 @@ class GroupBy:
def __init__(self) -> None:
self._watcher = [] # type: List[Watcher]
self._results = [] # type: List[GroupBySource]
self._resolver = {} # type: Dict[str, Tuple[GroupKey, Config]]
self._resolver = {} # type: Dict[str, Tuple[str, Config]]
# ----------------
# Add observer
# ----------------
def add_watcher(self, key: ConfigKey, config: AnyConfig) -> Watcher:
def add_watcher(self, key: str, config: AnyConfig) -> Watcher:
''' Init Config and add to watch list. '''
w = Watcher(Config.from_any(key, config))
self._watcher.append(w)

View File

@@ -3,7 +3,7 @@ from lektor.pluginsystem import Plugin # subclass
from lektor.sourceobj import SourceObject # typing
from typing import List, Optional, Iterator
from .vobj import GroupBySource, GroupByBuildProgram, GroupKey, VPATH
from .vobj import GroupBySource, GroupByBuildProgram, VPATH
from .groupby import GroupBy
from .pruner import prune
from .watcher import GroupByCallbackArgs # typing
@@ -39,7 +39,7 @@ class GroupByPlugin(Plugin):
split = config.get(key + '.split') # type: str
@watcher.grouping()
def _fn(args: GroupByCallbackArgs) -> Iterator[GroupKey]:
def _fn(args: GroupByCallbackArgs) -> Iterator[str]:
val = args.field
if isinstance(val, str):
val = val.split(split) if split else [val] # make list

View File

@@ -6,14 +6,13 @@ from lektor.environment import Expression
from lektor.sourceobj import VirtualSourceObject # subclass
from lektor.utils import build_url
from typing import Dict, List, Any, Optional, Iterator, NewType
from typing import Dict, List, Any, Optional, Iterator
from weakref import WeakSet
from .config import Config
from .pruner import track_not_prune
from .util import report_config_error
VPATH = '@groupby' # potentially unsafe. All matching entries are pruned.
GroupKey = NewType('GroupKey', str) # key of group-by
# -----------------------------------
@@ -30,7 +29,7 @@ class GroupBySource(VirtualSourceObject):
def __init__(
self,
record: Record,
group: GroupKey,
group: str,
config: Config,
children: Optional[Dict[Record, List[object]]] = None,
) -> None:
@@ -46,8 +45,11 @@ class GroupBySource(VirtualSourceObject):
self._children[child] = extras
self._reverse_reference_records()
# evaluate slug Expression
self.slug = self._eval(config.slug, field='slug') # type: str
assert self.slug != Ellipsis, 'invalid config: ' + config.slug
if '{key}' in config.slug:
self.slug = config.slug.replace('{key}', self.key)
else:
self.slug = self._eval(config.slug, field='slug')
assert self.slug != Ellipsis, 'invalid config: ' + config.slug
if self.slug and self.slug.endswith('/index.html'):
self.slug = self.slug[:-10]
# extra fields
@@ -98,7 +100,7 @@ class GroupBySource(VirtualSourceObject):
# -----------------------
@property
def children(self):
def children(self) -> Dict[Record, List[object]]:
return self._children
@property

View File

@@ -2,10 +2,10 @@ from lektor.db import Database, Record # typing
from lektor.types.flow import Flow, FlowType
from lektor.utils import bool_from_string
from typing import Set, Dict, List, Tuple, Union, NamedTuple
from typing import Set, Dict, List, Tuple, Any, Union, NamedTuple
from typing import Optional, Callable, Iterable, Iterator, Generator
from .vobj import GroupBySource, GroupKey
from .config import Config, ConfigKey
from .vobj import GroupBySource
from .config import Config
# -----------------------------------
@@ -21,14 +21,12 @@ class FieldKeyPath(NamedTuple):
class GroupByCallbackArgs(NamedTuple):
record: Record
key: FieldKeyPath
field: object # lektor model data-field value
field: Any # lektor model data-field value
GroupByCallbackYield = Union[GroupKey, Tuple[GroupKey, object]]
GroupingCallback = Callable[[GroupByCallbackArgs], Union[
Iterator[GroupByCallbackYield],
Generator[GroupByCallbackYield, Optional[str], None],
Iterator[Union[str, Tuple[str, Any]]],
Generator[Union[str, Tuple[str, Any]], Optional[str], None],
]]
@@ -36,10 +34,10 @@ GroupingCallback = Callable[[GroupByCallbackArgs], Union[
# ModelReader
# -----------------------------------
class ModelReader:
class GroupByModelReader:
''' Find models and flow-models which contain attribute '''
def __init__(self, db: Database, attrib: ConfigKey) -> None:
def __init__(self, db: Database, attrib: str) -> None:
self._flows = {} # type: Dict[str, Set[str]]
self._models = {} # type: Dict[str, Dict[str, str]]
# find flow blocks containing attribute
@@ -67,7 +65,7 @@ class ModelReader:
self,
record: Record,
flatten: bool = False
) -> Iterator[Tuple[FieldKeyPath, object]]:
) -> Iterator[Tuple[FieldKeyPath, Any]]:
'''
Enumerate all fields of a Record with attrib = True.
Flows are either returned directly (flatten=False) or
@@ -97,30 +95,30 @@ class ModelReader:
# State
# -----------------------------------
class State:
''' Holds and updates a groupby build state. '''
class GroupByState:
''' Store and update a groupby build state. {group: {record: [extras]}} '''
def __init__(self) -> None:
self.state = {} # type: Dict[GroupKey, Dict[Record, List[object]]]
self.state = {} # type: Dict[str, Dict[Record, List[Any]]]
self._processed = set() # type: Set[Record]
def __contains__(self, record: Record) -> bool:
''' Returns True if record was already processed. '''
return record.path in self._processed
def items(self) -> Iterable[Tuple[GroupKey, Dict]]:
''' Iterable with (GroupKey, {record: [extras]}) tuples. '''
def items(self) -> Iterable[Tuple[str, Dict[Record, List[Any]]]]:
''' Iterable with (group, {record: [extras]}) tuples. '''
return self.state.items()
def add(self, record: Record, group: Dict[GroupKey, List[object]]) -> None:
''' Append groups if not processed already. '''
def add(self, record: Record, sub_groups: Dict[str, List[Any]]) -> None:
''' Append groups if not processed already. {group: [extras]} '''
if record.path not in self._processed:
self._processed.add(record.path)
for group_key, extras in group.items():
if group_key in self.state:
self.state[group_key][record] = extras
for group, extras in sub_groups.items():
if group in self.state:
self.state[group][record] = extras
else:
self.state[group_key] = {record: extras}
self.state[group] = {record: extras}
# -----------------------------------
@@ -130,7 +128,7 @@ class State:
class Watcher:
'''
Callback is called with (Record, FieldKeyPath, field-value).
Callback may yield one or more (group-key, extra-info) tuples.
Callback may yield one or more (group, extra-info) tuples.
'''
def __init__(self, config: Config) -> None:
@@ -144,7 +142,7 @@ class Watcher:
Decorator to subscribe to attrib-elements.
If flatten = False, dont explode FlowType.
(record, field-key, field) -> (group-key, extra-info)
(record, field-key, field) -> (group, extra-info)
'''
def _decorator(fn: GroupingCallback) -> None:
self.flatten = flatten
@@ -155,8 +153,8 @@ class Watcher:
''' Reset internal state. You must initialize before each build! '''
assert callable(self.callback), 'No grouping callback provided.'
self._root = self.config.root
self._state = State()
self._model_reader = ModelReader(db, attrib=self.config.key)
self._state = GroupByState()
self._model_reader = GroupByModelReader(db, attrib=self.config.key)
def should_process(self, node: Record) -> bool:
''' Check if record path is being watched. '''
@@ -170,7 +168,7 @@ class Watcher:
'''
if record in self._state:
return
tmp = {} # type: Dict[GroupKey, List[object]]
tmp = {} # type: Dict[str, List[Any]] # {group: [extras]}
for key, field in self._model_reader.read(record, self.flatten):
_gen = self.callback(GroupByCallbackArgs(record, key, field))
try: