ref: class SymLinks
This commit is contained in:
153
brew.py
153
brew.py
@@ -568,11 +568,7 @@ def cli_cleanup(args: ArgParams) -> None:
|
||||
|
||||
# should never happen but just in case, remove symlinks which point nowhere
|
||||
Log.info('==> Removing dead links')
|
||||
links = Cellar.allBinLinks() + Cellar.allOptLinks()
|
||||
if args.packages:
|
||||
deadPaths = [pkg.path + '/' for pkg in packages]
|
||||
links = [lnk for lnk in links
|
||||
if any(lnk.target.startswith(x) for x in deadPaths)]
|
||||
links = SymLinks.inCellar(args.packages, opt=True, bin=True)
|
||||
|
||||
for link in links:
|
||||
if not os.path.exists(link.target):
|
||||
@@ -1158,26 +1154,88 @@ class DependencyTree:
|
||||
|
||||
|
||||
# -----------------------------------
|
||||
# LinkTarget
|
||||
# SymLinks
|
||||
# -----------------------------------
|
||||
|
||||
class LinkTarget(NamedTuple):
|
||||
path: str
|
||||
target: str # absolute path
|
||||
raw: str = '' # relative target
|
||||
class SymLinks:
|
||||
class Link(NamedTuple):
|
||||
path: str
|
||||
target: str # absolute path
|
||||
raw: str # relative target
|
||||
|
||||
@staticmethod
|
||||
def read(filePath: str) -> 'LinkTarget|None':
|
||||
''' Read a single symlink and populate with absolute paths '''
|
||||
if not os.path.islink(filePath):
|
||||
return None
|
||||
raw = os.readlink(filePath)
|
||||
real = os.path.realpath(os.path.join(os.path.dirname(filePath), raw))
|
||||
return LinkTarget(filePath, real, raw)
|
||||
def _allInDir(path: str) -> Iterator[Link]:
|
||||
''' Recursively search `path` for symlinks '''
|
||||
for base, dirs, files in os.walk(path):
|
||||
for entry in dirs + files:
|
||||
filePath = os.path.join(base, entry)
|
||||
if os.path.islink(filePath):
|
||||
raw = os.readlink(filePath)
|
||||
real = os.path.realpath(os.path.join(base, raw))
|
||||
yield SymLinks.Link(filePath, real, raw)
|
||||
|
||||
@staticmethod
|
||||
def allInDir(path: str) -> 'list[LinkTarget]':
|
||||
return [x for f in os.scandir(path) if (x := LinkTarget.read(f.path))]
|
||||
def forInstall(pkgPath: str, *, opt: bool, bin: bool) \
|
||||
-> list[Link]:
|
||||
'''
|
||||
Collection of symlinks which can be linked into Cellar.
|
||||
Will iterate over files inside of package `@/Cellar/<pkg>/<ver>/...`
|
||||
'''
|
||||
rv = []
|
||||
pkgName = os.path.basename(os.path.dirname(pkgPath))
|
||||
optLinkPath = os.path.join(Cellar.OPT, pkgName)
|
||||
|
||||
def _fn(where: str, what: str) -> None:
|
||||
# dynamic redirect. link on opt-link instead of direct file
|
||||
path = os.path.join(where, what)
|
||||
target = os.path.join(optLinkPath, os.path.basename(where), what)
|
||||
raw = os.path.relpath(target, os.path.dirname(path))
|
||||
rv.append(SymLinks.Link(path, target, raw))
|
||||
|
||||
if opt:
|
||||
raw = os.path.relpath(pkgPath, Cellar.OPT)
|
||||
rv.append(SymLinks.Link(optLinkPath, pkgPath + '/', raw))
|
||||
|
||||
if bin and os.path.isdir(binDir := os.path.join(pkgPath, 'bin')):
|
||||
for exe in os.scandir(binDir):
|
||||
if os.access(exe, os.X_OK): # executable flag
|
||||
_fn(Cellar.BIN, exe.name)
|
||||
|
||||
return rv
|
||||
|
||||
@staticmethod
|
||||
def inCellar(
|
||||
pkgs: 'list[str]|str' = '', *, opt: bool, bin: bool,
|
||||
) -> list[Link]:
|
||||
'''
|
||||
Return existing symlinks in `@/opt`, `@/bin`.
|
||||
Optionally, filter links by package name.
|
||||
'''
|
||||
rv = [] # type: list[SymLinks.Link]
|
||||
if opt:
|
||||
rv += SymLinks._allInDir(Cellar.OPT)
|
||||
if bin:
|
||||
rv += SymLinks._allInDir(Cellar.BIN)
|
||||
|
||||
if not pkgs:
|
||||
return rv
|
||||
|
||||
filtered = []
|
||||
if isinstance(pkgs, str): # special case for one, because it's faster
|
||||
prefix = Cellar.installPath(pkgs)
|
||||
for lnk in rv:
|
||||
if lnk.target.startswith(prefix):
|
||||
filtered.append(lnk)
|
||||
return filtered
|
||||
|
||||
haystack = set(pkgs)
|
||||
for lnk in rv:
|
||||
withoutPath = lnk.target.removeprefix(Cellar.CELLAR)
|
||||
if lnk.target != withoutPath: # inside of cellar
|
||||
pkgName = withoutPath.lstrip('/').split('/', 1)[0]
|
||||
if pkgName in haystack:
|
||||
filtered.append(lnk)
|
||||
return filtered
|
||||
|
||||
|
||||
# -----------------------------------
|
||||
@@ -1293,32 +1351,27 @@ class LocalPackage:
|
||||
# Symlink processing
|
||||
|
||||
@cached_property
|
||||
def optLink(self) -> 'LinkTarget|None':
|
||||
def optLink(self) -> 'SymLinks.Link|None':
|
||||
''' Read `@/opt/<pkg>` link. `None` if non-exist or not link to pkg '''
|
||||
# TODO: should opt-links have "@version" suffix or not?
|
||||
# if no, fix-dylib needs adjustments
|
||||
lnk = LinkTarget.read(os.path.join(Cellar.OPT, self.name))
|
||||
lnk = SymLinks.inCellar(self.name, opt=True, bin=False)[0]
|
||||
if lnk and not lnk.target.startswith(self.path + '/'):
|
||||
return None
|
||||
return lnk
|
||||
|
||||
@cached_property
|
||||
def binLinks(self) -> list[LinkTarget]:
|
||||
def binLinks(self) -> list[SymLinks.Link]:
|
||||
''' List of `@/bin/...` links that match `<pkg>` destination '''
|
||||
return [lnk for lnk in Cellar.allBinLinks()
|
||||
if lnk.target.startswith(self.path + '/')]
|
||||
return SymLinks.inCellar(self.name, opt=False, bin=True)
|
||||
|
||||
def unlink(
|
||||
self, *, unlinkOpt: bool, unlinkBin: bool,
|
||||
dryRun: bool = False, quiet: bool = False,
|
||||
) -> list[LinkTarget]:
|
||||
) -> list[SymLinks.Link]:
|
||||
''' remove symlinks `@/opt/<pkg>` and `@/bin/...` matching target '''
|
||||
rv = []
|
||||
if unlinkBin:
|
||||
rv += self.binLinks
|
||||
|
||||
if unlinkOpt:
|
||||
rv += filter(None, [self.optLink])
|
||||
rv = SymLinks.inCellar(
|
||||
self.name, opt=unlinkOpt, bin=unlinkBin)
|
||||
|
||||
for lnk in rv:
|
||||
if not quiet:
|
||||
@@ -1387,14 +1440,6 @@ class LocalPackageVersion:
|
||||
|
||||
# Symlink processing
|
||||
|
||||
@cached_property
|
||||
def _gatherBinaries(self) -> list[str]:
|
||||
''' Binary paths in `@/Cellar/<pkg>/<version>/bin/...` '''
|
||||
path = os.path.join(self.path, 'bin')
|
||||
if os.path.isdir(path):
|
||||
return [x.path for x in os.scandir(path) if os.access(x, os.X_OK)]
|
||||
return []
|
||||
|
||||
def link(
|
||||
self, *, linkOpt: bool, linkBin: bool,
|
||||
dryRun: bool = False, quiet: bool = False,
|
||||
@@ -1403,28 +1448,16 @@ class LocalPackageVersion:
|
||||
if not self.installed:
|
||||
raise RuntimeError('Package not installed')
|
||||
|
||||
queue = []
|
||||
optLinkPath = os.path.join(Cellar.OPT, self.pkg.name)
|
||||
|
||||
if linkOpt:
|
||||
queue.append(LinkTarget(optLinkPath, self.path + '/'))
|
||||
|
||||
for exePath in self._gatherBinaries if linkBin else []:
|
||||
# dynamic link on opt instead of direct
|
||||
dynLink = exePath.replace(self.path, optLinkPath, 1)
|
||||
queue.append(LinkTarget(
|
||||
os.path.join(Cellar.BIN, os.path.basename(exePath)), dynLink))
|
||||
|
||||
for link in queue:
|
||||
relTgt = os.path.relpath(link.target, os.path.dirname(link.path))
|
||||
for link in SymLinks.forInstall(
|
||||
self.path, opt=linkOpt, bin=linkBin):
|
||||
short = Cellar.shortPath(link.path)
|
||||
if os.path.islink(link.path) or os.path.exists(link.path):
|
||||
Log.warn(f'skip already existing link: {short}', summary=True)
|
||||
else:
|
||||
if not quiet:
|
||||
Log.info(f' link {short} -> {relTgt}')
|
||||
Log.info(f' link {short} -> {link.raw}')
|
||||
if not dryRun:
|
||||
os.symlink(relTgt, link.path)
|
||||
os.symlink(link.raw, link.path)
|
||||
|
||||
if not dryRun:
|
||||
self.pkg._resetCachedProperty(optLink=linkOpt, binLink=linkBin)
|
||||
@@ -1640,16 +1673,6 @@ class Cellar:
|
||||
)
|
||||
return DependencyTree(forward)
|
||||
|
||||
@staticmethod
|
||||
def allBinLinks() -> list[LinkTarget]:
|
||||
''' List of all `@/bin/...` links '''
|
||||
return LinkTarget.allInDir(Cellar.BIN)
|
||||
|
||||
@staticmethod
|
||||
def allOptLinks() -> list[LinkTarget]:
|
||||
''' List of all `@/opt/...` links '''
|
||||
return LinkTarget.allInDir(Cellar.OPT)
|
||||
|
||||
@staticmethod
|
||||
def shortPath(path: str) -> str:
|
||||
''' Return truncated path (relative to `Cellar.ROOT`) '''
|
||||
|
||||
Reference in New Issue
Block a user