feat: LocalPackage

This commit is contained in:
relikd
2025-09-01 20:29:17 +02:00
parent 06c398fdd0
commit 6c959ba298

640
brew.py
View File

@@ -22,6 +22,7 @@ from tarfile import TarInfo, open as openTarfile
from urllib import request as Req # build_opener, install_opener, urlretrieve
from urllib.error import HTTPError
from webbrowser import open as launchBrowser
from functools import cached_property
from argparse import (
ArgumentParser, Action, BooleanOptionalAction,
Namespace as ArgParams,
@@ -82,21 +83,21 @@ def cli_info(args: ArgParams) -> None:
return
Log.info('Package:', args.package)
info = Cellar.info(args.package)
Log.info('Installed:', 'yes' if info.installed else 'no')
pkg = LocalPackage(args.package)
Log.info('Installed:', 'yes' if pkg.installed else 'no')
# local information
if info.installed:
Log.info(' Active version:', info.verActive or '')
Log.info(' Inactive versions:', ', '.join(info.verInactive) or '')
if pkg.installed:
Log.info(' Active version:', pkg.activeVersion or '')
Log.info(' Inactive versions:', ', '.join(pkg.inactiveVersions) or '')
ver = args.version or info.verActive
ver = args.version or pkg.activeVersion
if ver:
Log.info(f' Dependencies[{ver}]:')
if ver not in info.verAll:
if ver not in pkg.allVersions:
Log.info(' <not installed>')
else:
localDeps = Cellar.getDependencies(args.package, ver)
localDeps = pkg.getDependencies(ver)
Log.info(' ', ', '.join(sorted(localDeps)) or '<none>')
Log.info()
@@ -133,7 +134,7 @@ def cli_info(args: ArgParams) -> None:
# https://docs.brew.sh/Manpage#home-homepage---formula---cask-formulacask-
def cli_home(args: ArgParams) -> None:
''' Open a project's homepage in a browser. '''
url = Cellar.getHomepageUrl(args.package)
url = LocalPackage(args.package).homepageUrl
if not url:
if not Utils.ask('package not installed. Search online?'):
return
@@ -186,23 +187,23 @@ def cli_fetch(args: ArgParams) -> None:
# https://docs.brew.sh/Manpage#list-ls-options-installed_formulainstalled_cask-
def cli_list(args: ArgParams) -> None:
''' List installed packages. '''
infos = Cellar.infoAll(assertInstalled=True)
packages = Cellar.infoAll(assertInstalled=True)
if args.multiple:
infos = [x for x in infos if len(x.verAll) > 1]
packages = [x for x in packages if len(x.allVersions) > 1]
if args.pinned:
infos = [x for x in infos if x.pinned]
if not infos:
packages = [x for x in packages if x.pinned]
if not packages:
Log.main('no package found.')
return
if args.versions:
for info in infos:
txt = '{}: {}'.format(info.package, info.verActive or 'not linked')
if info.verInactive:
txt += ' ({})'.format(', '.join(info.verInactive))
for pkg in packages:
txt = '{}: {}'.format(pkg.name, pkg.activeVersion or 'not linked')
if pkg.inactiveVersions:
txt += ' ({})'.format(', '.join(pkg.inactiveVersions))
Log.main(txt)
else:
Utils.printInColumns([x.package for x in infos],
Utils.printInColumns([x.name for x in packages],
plainList=not Env.IS_TTY or args.__dict__['1'])
@@ -284,10 +285,10 @@ def cli_missing(args: ArgParams) -> None:
# https://docs.brew.sh/Manpage#install-options-formulacask-
def cli_install(args: ArgParams) -> None:
''' Install a package with all dependencies. '''
local = Cellar.info(args.package)
if local.installed and not args.force:
Log.info(args.package, 'already installed, checking for newer version')
Brew.checkUpdate(args.package)
pkg = LocalPackage(args.package)
if pkg.installed and not args.force:
Log.info(pkg.name, 'already installed, checking for newer version')
pkg.checkUpdate()
return
queue = InstallQueue(dryRun=args.dry_run, force=args.force)
@@ -323,77 +324,76 @@ def cli_uninstall(args: ArgParams) -> None:
# https://docs.brew.sh/Manpage#link-ln-options-installed_formula-
def cli_link(args: ArgParams) -> None:
''' Link a specific package version (activate). '''
info = Cellar.info(args.package, assertInstalled=True)
if info.verActive:
pkg = LocalPackage(args.package).assertInstalled()
if pkg.activeVersion:
# must unlink before relinking (except --bin)
if args.bin:
args.version = info.verActive
args.version = pkg.activeVersion
else:
Log.error(f'already linked to {info.verActive}. Unlink first.')
Log.error(f'already linked to {pkg.activeVersion}. Unlink first.')
return
# auto-fill version if there is only one version
if not args.version:
if len(info.verAll) == 1:
args.version = info.verAll[0]
if len(pkg.allVersions) == 1:
args.version = pkg.allVersions[0]
else:
Log.info('Multiple versions found:')
Log.info(Utils.prettyList(info.verAll))
Log.info(Utils.prettyList(pkg.allVersions))
Log.error('no package version provided.')
return
# check if package is really installed
if args.version not in info.verAll:
if args.version not in pkg.allVersions:
Log.error('package version', args.version, 'not found')
return
if not args.force and Cellar.isKegOnly(args.package, args.version):
if not args.force and pkg.isKegOnly(args.version):
Log.error(args.package, 'is keg-only. Use -f to force linking.')
return
# perform link
Cellar.linkPackage(args.package, args.version,
noExe=args.no_bin, dryRun=args.dry_run)
pkg.link(args.version, noExe=args.no_bin, dryRun=args.dry_run)
Log.main('==> Linked to', args.version)
# https://docs.brew.sh/Manpage#unlink---dry-run-installed_formula-
def cli_unlink(args: ArgParams) -> None:
''' Remove symlinks for package to (temporarily) disable it. '''
info = Cellar.info(args.package, assertInstalled=True)
if not info.verActive:
pkg = LocalPackage(args.package).assertInstalled()
if not pkg.activeVersion:
Log.error(args.package, 'is not active')
return
# perform unlink
Cellar.unlinkPackage(args.package, onlyExe=args.bin, dryRun=args.dry_run)
Log.main('==> Unlinked', info.verActive)
pkg.unlink(onlyExe=args.bin, dryRun=args.dry_run)
Log.main('==> Unlinked', pkg.activeVersion)
def cli_switch(args: ArgParams) -> None:
''' Change package version. '''
info = Cellar.info(args.package, assertInstalled=True)
if not info.verActive:
pkg = LocalPackage(args.package).assertInstalled()
if not pkg.activeVersion:
Log.error('cannot switch, package is not active')
return
if info.verActive == args.version:
Log.main('already on', info.verActive)
if pkg.activeVersion == args.version:
Log.main('already on', pkg.activeVersion)
return
# convenience toggle
if not args.version and len(info.verInactive) == 1:
args.version = info.verInactive[0]
if not args.version and len(pkg.inactiveVersions) == 1:
args.version = pkg.inactiveVersions[0]
# convenience list print
if not args.version:
Log.info('Available versions:')
Utils.printInColumns(info.verAll, prefix=' ')
Utils.printInColumns(pkg.allVersions, prefix=' ')
Log.error('no version provided')
return
noBinsLinks = not Cellar.getBinLinks(args.package)
Cellar.unlinkPackage(args.package, onlyExe=False)
Cellar.linkPackage(args.package, args.version, noExe=noBinsLinks)
noBinsLinks = not pkg.binLinks
pkg.unlink(onlyExe=False)
pkg.link(args.version, noExe=noBinsLinks)
Log.main('==> switched to version', args.version)
if noBinsLinks:
Log.warn('no binary links found. Skipped for new version as well.')
@@ -402,17 +402,17 @@ def cli_switch(args: ArgParams) -> None:
# https://docs.brew.sh/Manpage#pin-installed_formula-
def cli_pin(args: ArgParams) -> None:
''' Prevent specified packages from being upgraded. '''
for info in Cellar.infoAll(args.packages, assertInstalled=True):
if Cellar.pinPackage(info.package):
Log.info('pinned', info.package)
for pkg in Cellar.infoAll(args.packages, assertInstalled=True):
if pkg.pin(True):
Log.info('pinned', pkg.name)
# https://docs.brew.sh/Manpage#unpin-installed_formula-
def cli_unpin(args: ArgParams) -> None:
''' Allow specified packages to be upgraded. '''
for info in Cellar.infoAll(args.packages, assertInstalled=True):
if Cellar.pinPackage(info.package, False):
Log.info('unpinned', info.package)
for pkg in Cellar.infoAll(args.packages, assertInstalled=True):
if pkg.pin(False):
Log.info('unpinned', pkg.name)
# https://docs.brew.sh/Manpage#cleanup-options-formulacask-
@@ -424,8 +424,8 @@ def cli_cleanup(args: ArgParams) -> None:
This can be adjusted with $BREW_PY_CLEANUP_MAX_AGE_DAYS.
'''
total_savings = 0
infos = Cellar.infoAll(args.packages, assertInstalled=True)
if not infos:
packages = Cellar.infoAll(args.packages, assertInstalled=True)
if not packages:
Log.error('no package found')
return
@@ -438,16 +438,16 @@ def cli_cleanup(args: ArgParams) -> None:
# remove all non-active versions
Log.info('==> Removing old versions')
for info in infos:
for ver in info.verInactive:
if Cellar.isKegOnly(info.package, ver):
for pkg in packages:
for ver in pkg.inactiveVersions:
if pkg.isKegOnly(ver):
continue
path = Cellar.installPath(info.package, ver)
path = os.path.join(pkg.path, ver)
total_savings += File.remove(path, dryRun=args.dry_run)
# should never happen but just in case, remove symlinks which point nowhere
Log.info('==> Removing dead links')
binLinks = Cellar.getBinLinks()
binLinks = Cellar.allBinLinks()
if args.packages:
deadPaths = set(Cellar.installPath(x) + '/' for x in args.packages)
binLinks = [x for x in binLinks
@@ -910,6 +910,244 @@ class DependencyTree:
return set(self.reverse).difference(self.forward)
# -----------------------------------
# LinkTarget
# -----------------------------------
class LinkTarget(NamedTuple):
path: str
target: str # absolute path
raw: str = '' # relative target
@staticmethod
def read(filePath: str, startswith: str = '') -> 'LinkTarget|None':
''' Read a single symlink and populate with absolute paths '''
if not os.path.islink(filePath):
return None
raw = os.readlink(filePath)
real = os.path.realpath(os.path.join(os.path.dirname(filePath), raw))
if real.startswith(startswith or ''):
return LinkTarget(filePath, real, raw)
return None
# -----------------------------------
# LocalPackage
# -----------------------------------
class LocalPackage:
'''
Most properties are cached. Throw away your instance after (un-)install.
'''
def __init__(self, pkg: str) -> None:
self.name = pkg
self.path = Cellar.installPath(pkg)
def assertInstalled(self, msg: str = 'unknown package:') -> 'LocalPackage':
'''If not installed: print error message and exit with status code 1'''
if not self.installed:
Log.error(msg, self.name)
exit(1)
return self
def checkUpdate(self, *, force: bool = False) -> None:
''' Print whether package is up-to-date or needs upgrade '''
if self.installed:
onlineVersion = Brew.info(self.name, force=force).version
if onlineVersion in self.allVersions:
Log.info('package is up to date.')
else:
Log.info(' * upgrade available {} (installed: {})'.format(
onlineVersion, ', '.join(self.allVersions)))
# Ruby file processing
def rubyPath(self, version: str) -> str:
''' Returns `@/cellar/<pkg>/<version>/.brew/<pkg>.rb` '''
return os.path.join(self.path, version, '.brew', self.name + '.rb')
@cached_property
def homepageUrl(self) -> 'str|None':
''' Extract homepage url from ruby file '''
version = self.activeVersion or ([None] + self.allVersions)[-1]
if version:
return RubyParser(self.rubyPath(version)).parseHomepageUrl()
return None
def getDependencies(self, version: str) -> set[str]:
''' Extract dependencies from ruby file '''
assert version, 'version is required'
return RubyParser(self.rubyPath(version)).parse().dependencies
def isKegOnly(self, version: str) -> bool:
''' Check if package is keg-only '''
return RubyParser(self.rubyPath(version)).parseKegOnly()
# Versions
@cached_property
def activeVersion(self) -> 'str|None':
''' Returns currently active version (if opt-link is set) '''
return os.path.basename(self.optLink.target) if self.optLink else None
@cached_property
def allVersions(self) -> list[str]:
''' All installed versions '''
rv = []
if os.path.isdir(self.path):
for ver in sorted(os.listdir(self.path)):
if os.path.isdir(os.path.join(self.path, ver, '.brew')):
rv.append(ver)
return rv
@cached_property
def installed(self) -> bool:
''' Returns `True` if at least one version is installed '''
return len(self.allVersions) > 0
@cached_property
def inactiveVersions(self) -> list[str]:
''' Versions which are currently not active (not opt-linked) '''
return [x for x in self.allVersions if x != self.activeVersion]
# Custom config files
@cached_property
def pinned(self) -> bool:
''' Returns `True` if `.pinned` file exists '''
return os.path.exists(os.path.join(self.path, '.pinned'))
def pin(self, flag: bool) -> bool:
''' Create or delete `.pinned` file. Returns `False` if no change. '''
assert os.path.isdir(self.path), 'Package must be installed to (un)pin'
if changes := flag ^ self.pinned:
del self.pinned # clear cached_property
if flag:
File.touch(os.path.join(self.path, '.pinned'))
else:
os.remove(os.path.join(self.path, '.pinned'))
return changes
@cached_property
def primary(self) -> bool:
''' Returns `False` if package was installed (only) as a dependency '''
return os.path.exists(os.path.join(self.path, '.primary'))
def setPrimary(self, flag: bool) -> None:
''' Create `.primary` (main pkg) or `.secondary` (dependency) file '''
fname = os.path.join(self.path, '.primary' if flag else '.secondary')
if flag:
self.__dict__.pop('primary', None) # clear cached_property
File.touch(fname)
def setDigest(self, version: str, digest: str) -> None:
''' Copy digest of tar file into install dir '''
digest_file = os.path.join(self.path, version, '.brew', 'digest')
with open(digest_file, 'w') as fp:
fp.write(digest)
# Symlink processing
def readOptLink(self, *, ensurePkg: bool) -> 'LinkTarget|None':
''' Read `@/opt/<pkg>` link. Returns `None` if non-exist '''
pkgPath = (self.path + '/') if ensurePkg else ''
return LinkTarget.read(os.path.join(Cellar.OPT, self.name), pkgPath)
@cached_property
def optLink(self) -> 'LinkTarget|None':
''' Return `OPT` link but only if it links to the current package '''
return self.readOptLink(ensurePkg=True)
@cached_property
def binLinks(self) -> list[LinkTarget]:
''' List of `@/bin/...` links that match `<pkg>` destination '''
return Cellar.allBinLinks(self.path + '/')
def unlink(
self, *,
onlyExe: bool = False, dryRun: bool = False, quiet: bool = False,
) -> list[LinkTarget]:
''' remove symlinks `@/opt/<pkg>` and `@/bin/...` matching target '''
rv = []
rv += self.binLinks
del self.binLinks # clear cached_property
if not onlyExe:
rv += filter(None, [self.readOptLink(ensurePkg=False)])
self.__dict__.pop('optLink', None) # clear cached_property
for lnk in rv:
if not quiet:
Log.info(f' unlink {Cellar.shortPath(lnk.path)} -> {lnk.raw}')
if not dryRun:
os.remove(lnk.path)
return rv
def _gatherBinaries(self, version: str) -> list[str]:
''' Binary paths in `cellar/<pkg>/<version>/bin/...` '''
path = os.path.join(self.path, version, 'bin')
if os.path.isdir(path):
return [x.path for x in os.scandir(path) if os.access(x, os.X_OK)]
return []
def link(
self, version: str, *, noExe: bool = False, dryRun: bool = False,
) -> None:
''' create symlinks `@/opt/<pkg>` and `@/bin/...` matching target '''
assert version, 'version is required'
verRoot = os.path.join(self.path, version)
if not os.path.isdir(verRoot):
raise RuntimeError('Package not installed')
if not dryRun:
self.__dict__.pop('optLink', None) # clear cached_property
if not noExe:
self.__dict__.pop('binLinks', None) # clear cached_property
verLink = os.path.join(Cellar.OPT, self.name)
queue = [LinkTarget(verLink, verRoot + '/')]
for exePath in [] if noExe else self._gatherBinaries(version):
# dynamic link on opt instead of direct
dynLink = exePath.replace(verRoot, verLink, 1)
queue.append(LinkTarget(
os.path.join(Cellar.BIN, os.path.basename(exePath)), dynLink))
for link in queue:
relTgt = os.path.relpath(link.target, os.path.dirname(link.path))
short = Cellar.shortPath(link.path)
if os.path.islink(link.path) or os.path.exists(link.path):
Log.warn(f'skip already existing link: {short}', summary=True)
else:
Log.info(f' link {short} -> {relTgt}')
if not dryRun:
os.symlink(relTgt, link.path)
# Post-install fix
def fix(self, version: str) -> None:
''' Re-link dylibs and fix time of symlinks '''
verRoot = os.path.join(self.path, version)
if not os.path.isfile(self.rubyPath(version)):
Log.error('not a brew-package directory', verRoot, summary=True)
return
for base, dirs, files in os.walk(verRoot):
for file in files:
fname = os.path.join(base, file)
if os.path.islink(fname):
Fixer.symlink(fname)
continue
# magic number check for Mach-O
with open(fname, 'rb') as fp:
if fp.read(4) != b'\xcf\xfa\xed\xfe':
continue
Fixer.dylib(fname, self.name, version)
# -----------------------------------
# Remote logic
# -----------------------------------
@@ -980,18 +1218,6 @@ class Brew:
auth = Brew._ghcrAuth(pkg)
return ApiGhcr.tags(auth, pkg, force=force)['tags']
@staticmethod
def checkUpdate(pkg: str, *, force: bool = False) -> None:
''' Print whether package is up-to-date or needs upgrade '''
local = Cellar.info(pkg)
if local.installed:
onlineVersion = Brew.info(pkg, force=force).version
if onlineVersion in local.verAll:
Log.info('package is up to date.')
else:
Log.info(' * upgrade available {} (installed: {})'.format(
onlineVersion, ', '.join(local.verAll)))
@staticmethod
def downloadBottle(
pkg: str, version: str, digest: str,
@@ -1070,194 +1296,44 @@ class Cellar:
return os.path.join(Cellar.CELLAR, pkg)
return os.path.join(Cellar.CELLAR, pkg, version)
@staticmethod
def configPath(pkg: str, version: str, fileName: str) -> str:
''' Returns `@/cellar/<pkg>/<version>/.brew/<fileName>` '''
pkgRoot = Cellar.installPath(pkg, version)
return os.path.join(pkgRoot, '.brew', fileName)
@staticmethod
def rubyPath(pkg: str, version: str) -> str:
''' Returns `@/cellar/<pkg>/<version>/.brew/<pkg>.rb` '''
return Cellar.configPath(pkg, version, pkg + '.rb')
# Version handling
class PackageInfo(NamedTuple):
package: str
installed: bool
pinned: bool
verActive: Optional[str]
verInactive: list[str]
verAll: list[str]
@staticmethod
def info(pkg: str, *, assertInstalled: bool = False) -> PackageInfo:
''' Info about active and available package versions '''
optLink = Cellar.getOptLink(pkg, ensurePkg=True)
active = os.path.basename(optLink.target) if optLink else None
inactive = []
available = []
pkgPath = Cellar.installPath(pkg)
isPinned = os.path.exists(os.path.join(pkgPath, '.pinned'))
if os.path.isdir(pkgPath):
for ver in sorted(os.listdir(pkgPath)):
if os.path.isdir(os.path.join(pkgPath, ver, '.brew')):
available.append(ver)
if ver != active:
inactive.append(ver)
isInstalled = len(available) > 0
# hard-fail if asserting for installed
if assertInstalled and not isInstalled:
Log.error('unknown package:', pkg)
exit(1)
return Cellar.PackageInfo(
pkg, isInstalled, isPinned, active, inactive, available)
@staticmethod
def infoAll(filterPkg: list[str] = [], *, assertInstalled: bool = False) \
-> list[PackageInfo]:
-> list[LocalPackage]:
''' List all installed packages (already checked for `.installed`) '''
pkgs = filterPkg if filterPkg else sorted(os.listdir(Cellar.CELLAR))
infos = [info for pkg in pkgs if (info := Cellar.info(pkg)).installed]
infos = [x for pkg in pkgs if (x := LocalPackage(pkg)).installed]
# hard-fail if asserting for installed
if assertInstalled and filterPkg and len(pkgs) != len(infos):
unkownPkgs = set(pkgs) - set(x.package for x in infos)
unkownPkgs = set(pkgs) - set(x.name for x in infos)
Log.error('unknown package:', ', '.join(sorted(unkownPkgs)))
exit(1)
return infos
@staticmethod
def pinPackage(pkg: str, pin: bool = True) -> bool:
pkgPath = Cellar.installPath(pkg)
assert os.path.isdir(pkgPath), 'Package must be installed to (un-)pin'
pin_file = os.path.join(pkgPath, '.pinned')
changed = pin ^ os.path.exists(pin_file)
if pin:
File.touch(pin_file)
elif os.path.exists(pin_file):
os.remove(pin_file)
return changed
@staticmethod
def getDependencyTree() -> DependencyTree:
''' Returns dict object for dependency traversal '''
forward = TreeDict()
for info in Cellar.infoAll(): # must always go over all, no filters
forward.direct[info.package] = set(
for pkg in Cellar.infoAll(): # must always go over all, no filters
forward.direct[pkg.name] = set(
dep
for ver in info.verAll
for dep in Cellar.getDependencies(info.package, ver) or []
for ver in pkg.allVersions
for dep in pkg.getDependencies(ver)
)
return DependencyTree(forward)
# Symlink processing
class LinkTarget(NamedTuple):
path: str
target: str # absolute path
raw: str # relative target
@staticmethod
def allBinLinks(matching: str = '') -> list[LinkTarget]:
''' List of all `@/bin/...` links with `<matching>` destination '''
return [lnk for file in os.scandir(Cellar.BIN)
if (lnk := LinkTarget.read(file.path, matching))]
@staticmethod
def _readLink(filePath: str, startswith: str = '') -> 'LinkTarget|None':
''' Read a single symlink and populate with absolute paths '''
if not os.path.islink(filePath):
return None
raw = os.readlink(filePath)
real = os.path.realpath(os.path.join(os.path.dirname(filePath), raw))
if real.startswith(startswith or ''):
return Cellar.LinkTarget(filePath, real, raw)
return None
@staticmethod
def getOptLink(pkg: str, *, ensurePkg: bool) -> 'LinkTarget|None':
''' Read `@/opt/<pkg>` link. Returns `None` if non-exist '''
pkgPath = (Cellar.installPath(pkg) + '/') if ensurePkg else ''
return Cellar._readLink(os.path.join(Cellar.OPT, pkg), pkgPath)
@staticmethod
def getBinLinks(pkg: 'str|None' = None) -> list[LinkTarget]:
''' List of `@/bin/...` links that match `<pkg>` destination '''
pkgPath = (Cellar.installPath(pkg) + '/') if pkg else ''
rv = []
for file in os.listdir(Cellar.BIN):
lnk = Cellar._readLink(os.path.join(Cellar.BIN, file), pkgPath)
if lnk:
rv.append(lnk)
return rv
@staticmethod
def unlinkPackage(
pkg: str, *,
onlyExe: bool = False, dryRun: bool = False, quiet: bool = False,
) -> list[LinkTarget]:
''' remove symlinks `@/opt/<pkg>` and `@/bin/...` matching target '''
rv = Cellar.getBinLinks(pkg)
if not onlyExe:
rv += filter(None, [Cellar.getOptLink(pkg, ensurePkg=False)])
for lnk in rv:
shortPath = os.path.relpath(lnk.path, Cellar.ROOT)
if not quiet:
Log.info(f' unlink {shortPath} -> {lnk.raw}')
if not dryRun:
os.remove(lnk.path)
return rv
@staticmethod
def _gatherBinaries(pkg: str, version: str) -> list[str]:
''' Binary names (not paths) in `cellar/<pkg>/<version>/bin/...` '''
path = os.path.join(Cellar.installPath(pkg, version), 'bin')
if os.path.isdir(path):
return [x.name for x in os.scandir(path) if os.access(x, os.X_OK)]
return []
@staticmethod
def linkPackage(
pkg: str, version: str, *, noExe: bool = False, dryRun: bool = False
) -> None:
''' create symlinks `@/opt/<pkg>` and `@/bin/...` matching target '''
assert version, 'version is required'
pkgRoot = Cellar.installPath(pkg, version)
if not os.path.isdir(pkgRoot):
raise RuntimeError('Package not installed')
def ln(path: str, linkTarget: str) -> None:
short = os.path.relpath(path, Cellar.ROOT)
if os.path.islink(path) or os.path.exists(path):
Log.warn(f'skip already existing link: {short}', summary=True)
else:
Log.info(f' link {short} -> {linkTarget}')
if not dryRun:
os.symlink(linkTarget, path)
ln(os.path.join(Cellar.OPT, pkg), f'../cellar/{pkg}/{version}/')
if not noExe:
for exe in Cellar._gatherBinaries(pkg, version):
ln(os.path.join(Cellar.BIN, exe), f'../opt/{pkg}/bin/{exe}')
# Ruby file processing
@staticmethod
def getDependencies(pkg: str, version: str) -> set[str]:
''' Extract dependencies from ruby file '''
assert version, 'version is required'
return RubyParser(Cellar.rubyPath(pkg, version)).parse().dependencies
@staticmethod
def getHomepageUrl(pkg: str) -> 'str|None':
''' Extract homepage url from ruby file '''
info = Cellar.info(pkg)
ver = info.verActive or ([None] + info.verAll)[-1]
if ver:
return RubyParser(Cellar.rubyPath(pkg, ver)).parseHomepageUrl()
return None
@staticmethod
def isKegOnly(pkg: str, version: str) -> bool:
''' Check if package is keg-only '''
return RubyParser(Cellar.rubyPath(pkg, version)).parseKegOnly()
def shortPath(path: str) -> str:
''' Return truncated path (relative to `Cellar.ROOT`) '''
# if OPT and BIN will be stored separately, check each path separately
return os.path.relpath(path, Cellar.ROOT)
# -----------------------------------
@@ -1274,7 +1350,7 @@ class TarPackage:
def extract(self, *, dryRun: bool = False) -> 'PkgVer|None':
''' Extract tar file into `@/cellar/...` '''
shortPath = os.path.relpath(self.fname, Cellar.ROOT)
shortPath = Cellar.shortPath(self.fname)
if shortPath.startswith('..'): # if path outside of cellar
shortPath = os.path.basename(self.fname)
@@ -1400,9 +1476,8 @@ class InstallQueue:
def add(self, pkg: str, version: str, digest: 'str|None') -> None:
''' Check if specific version exists and add to download queue '''
info = Cellar.info(pkg)
# skip if a specific version already exists
if not self.force and version in info.verAll:
if not self.force and version in LocalPackage(pkg).allVersions:
# TODO: print already installed?
return
if not digest:
@@ -1450,34 +1525,24 @@ class InstallQueue:
for i, tar in enumerate(reversed(self.installQueue), 1):
bundle = TarPackage(tar).extract(dryRun=self.dryRun)
if bundle and not self.dryRun:
self.postInstall(
bundle.package, bundle.version, File.sha256(tar),
skipLink=skipLink, linkExe=linkExe, isPrimary=i == total)
Log.endCounter()
Log.dumpErrorSummary()
def postInstall(
self, pkg: str, version: str, digest: str, *,
skipLink: bool, linkExe: bool, isPrimary: bool,
) -> None:
# copy digest of tar file into install dir
with open(Cellar.configPath(pkg, version, 'digest'), 'w') as fp:
fp.write(digest)
File.touch(Cellar.configPath(
pkg, version, 'primary' if isPrimary else 'secondary'))
# post-install stuff
pkg = LocalPackage(bundle.package)
pkg.setDigest(bundle.version, File.sha256(tar))
pkg.setPrimary(i == total)
# relink dylibs
Fixer.run(pkg, version)
pkg.fix(bundle.version)
if not skipLink:
if Cellar.isKegOnly(pkg, version):
Log.warn(f'keg-only, must link manually ({pkg}, {version})',
summary=True)
if skipLink:
continue
if pkg.isKegOnly(bundle.version):
Log.warn('keg-only, must link manually ({}, {})'.format(
pkg.name, bundle.version), summary=True)
else:
withBin = Env.LINK_BINARIES if linkExe is None else linkExe
Cellar.unlinkPackage(pkg)
Cellar.linkPackage(pkg, version, noExe=not withBin)
pkg.unlink()
pkg.link(bundle.version, noExe=not withBin)
Log.endCounter()
Log.dumpErrorSummary()
# -----------------------------------
@@ -1485,26 +1550,6 @@ class InstallQueue:
# -----------------------------------
class Fixer:
@staticmethod
def run(pkg: str, version: str) -> None:
path = Cellar.installPath(pkg, version)
if not os.path.isfile(Cellar.rubyPath(pkg, version)):
Log.error('not a brew-package directory', path, summary=True)
return
for base, dirs, files in os.walk(path):
for file in files:
fname = os.path.join(base, file)
if os.path.islink(fname):
Fixer.symlink(fname)
continue
# magic number check for Mach-O
with open(fname, 'rb') as fp:
if fp.read(4) != b'\xcf\xfa\xed\xfe':
continue
Fixer.dylib(fname, pkg, version)
@staticmethod
def symlink(fname: str) -> None:
''' Fix time on symlink, copy time from target link '''
@@ -1541,7 +1586,7 @@ class Fixer:
newRef = '@executable_path/' + newRef
if not did_change:
Log.info(' fix dylib', os.path.relpath(fname, Cellar.ROOT))
Log.info(' fix dylib', Cellar.shortPath(fname))
Log.debug(' OLD:', oldRef)
Log.debug(' NEW:', newRef)
@@ -1567,7 +1612,7 @@ class UninstallQueue:
# used by other packages (secondary dependencies with multiple parents)
self.skips = {} # type: dict[str, set[str]] # {pkg: {deps}}
# list of packages that will be removed
self.uninstallQueue = [] # type: list[str]
self.uninstallQueue = [] # type: list[LocalPackage]
def collect(
self, deletePkgs: list[str], hiddenPkgs: list[str], *,
@@ -1591,12 +1636,15 @@ class UninstallQueue:
self.warnings = {pkg: deps for pkg in deletePkgs
if (deps := getDeps(pkg) - hidden)}
def setUninstallQueue(pkgs: list[str]) -> None:
self.uninstallQueue = [LocalPackage(x) for x in pkgs]
# user said "these aren't the packages you're looking for"
activelyIgnored = depTree.obsolete(hiddenPkgs)
if ignoreDependencies:
setWarnings(activelyIgnored.union(deletePkgs))
self.uninstallQueue = deletePkgs # TODO: copy?
setUninstallQueue(deletePkgs)
self.skips = {}
return
@@ -1621,7 +1669,7 @@ class UninstallQueue:
removed -= depTree.forward.missing(removed)
setWarnings(hidden)
self.uninstallQueue = sorted(removed)
setUninstallQueue(sorted(removed))
irrelevant = removed.union(hiddenPkgs)
self.skips = {pkg: deps for pkg in skipped
if (deps := getDeps(pkg) - irrelevant)}
@@ -1651,8 +1699,7 @@ class UninstallQueue:
Log.info('==> Remove symlinks for', countPkgs, 'packages')
countSym = 0
for pkg in self.uninstallQueue:
links = Cellar.unlinkPackage(
pkg, dryRun=dryRun, quiet=dryRun and Log.LEVEL <= 2)
links = pkg.unlink(dryRun=dryRun, quiet=dryRun and Log.LEVEL <= 2)
countSym += len(links)
Log.main('Would remove' if dryRun else 'Removed', countSym, 'symlinks')
@@ -1660,8 +1707,7 @@ class UninstallQueue:
Log.info('==> Uninstall', countPkgs, 'packages')
total_savings = 0
for pkg in self.uninstallQueue:
path = Cellar.installPath(pkg)
total_savings += File.remove(path, dryRun=dryRun)
total_savings += File.remove(pkg.path, dryRun=dryRun)
Log.info('==> This operation {} approximately {} of disk space'.format(
'would free' if dryRun else 'has freed',
@@ -1670,7 +1716,7 @@ class UninstallQueue:
if dryRun:
print()
print('The following packages will be removed:')
Utils.printInColumns(self.uninstallQueue)
Utils.printInColumns([x.name for x in self.uninstallQueue])
if self.skips:
print()
print('The following packages will NOT be removed:')
@@ -1685,7 +1731,7 @@ class RubyParser:
PRINT_PARSE_ERRORS = True
ASSERT_KNOWN_SYMBOLS = False
IGNORE_RULES = False
FAKE_INSTALLED = set() # type: set[str] # simulate Cellar.info().installed
FAKE_INSTALLED = set() # type: set[str] # simulate LocalPackage.installed
IGNORED_TARGETS = set([':optional', ':build', ':test'])
TARGET_SYMBOLS = IGNORED_TARGETS.union([':recommended'])
@@ -1956,7 +2002,7 @@ class RubyParser:
elif clause.startswith('Formula["') and \
clause.endswith('"].any_version_installed?'):
pkg = clause.split('"')[1]
return Cellar.info(pkg).installed or pkg in self.FAKE_INSTALLED
return LocalPackage(pkg).installed or pkg in self.FAKE_INSTALLED
elif clause.startswith('build.with? "'):
pkg = clause.split('"')[1]
@@ -2086,7 +2132,7 @@ class File:
Log.main('{}: {} ({}{})'.format(
'Would remove' if dryRun else 'Removing',
os.path.relpath(path, Cellar.ROOT),
Cellar.shortPath(path),
f'{files} files, ' if isdir else '',
Utils.humanSize(size)))
if not dryRun: