From 023983bcbb252ca44bd2acaadfb123c2be22f138 Mon Sep 17 00:00:00 2001 From: relikd Date: Sun, 31 Aug 2025 13:45:15 +0200 Subject: [PATCH] feat: TarPackage --- brew.py | 189 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 97 insertions(+), 92 deletions(-) diff --git a/brew.py b/brew.py index f9baf3b..90bc3c4 100755 --- a/brew.py +++ b/brew.py @@ -1213,44 +1213,6 @@ class Cellar: pkgs = filterPkg if filterPkg else sorted(os.listdir(Cellar.CELLAR)) return [info for pkg in pkgs if (info := Cellar.info(pkg)).installed] - # Install management - - @staticmethod - def installTar(tarPath: str, *, dryRun: bool = False) \ - -> 'tuple[str, str]|None': - ''' Extract tar file into `@/cellar/...` ''' - shortPath = os.path.relpath(tarPath, Cellar.ROOT) - if shortPath.startswith('..'): # if path outside of cellar - shortPath = os.path.basename(tarPath) - - if not os.path.isfile(tarPath): - if dryRun: - Log.main('would install', shortPath, count=True) - return None - - pkg, version = None, None - with openTarfile(tarPath, 'r') as tar: - subset = [] - for x in tar: - if tarFilter(x, Cellar.CELLAR): - subset.append(x) - if not pkg and x.isdir() and x.path.endswith('/.brew'): - pkg, version, *_ = x.path.split('/') - else: - Log.error(f'prohibited tar entry "{x.path}" in', shortPath, - summary=True) - - if pkg is None or version is None: - Log.error('".brew" dir missing. Failed to extract', shortPath, - summary=True, count=True) - return None - - Log.main('would install' if dryRun else 'install', shortPath, - f'({pkg} {version})', count=True) - if not dryRun: - tar.extractall(Cellar.CELLAR, subset) - return pkg, version - @staticmethod def getDependencyTree() -> DependencyTree: ''' Returns dict object for dependency traversal ''' @@ -1371,6 +1333,100 @@ class Cellar: return RubyParser(Cellar.rubyPath(pkg, version)).parseKegOnly() +# ----------------------------------- +# TarPackage +# ----------------------------------- + +class TarPackage: + class PkgVer(NamedTuple): + package: str + version: str + + def __init__(self, fname: str) -> None: + self.fname = fname + + def extract(self, *, dryRun: bool = False) -> 'PkgVer|None': + ''' Extract tar file into `@/cellar/...` ''' + shortPath = os.path.relpath(self.fname, Cellar.ROOT) + if shortPath.startswith('..'): # if path outside of cellar + shortPath = os.path.basename(self.fname) + + if not os.path.isfile(self.fname): + if dryRun: + Log.main('would install', shortPath, count=True) + return None + + pkg, version = None, None + with openTarfile(self.fname, 'r') as tar: + subset = [] + for x in tar: + if self.filter(x, Cellar.CELLAR): + subset.append(x) + if not pkg and x.isdir() and x.path.endswith('/.brew'): + pkg, version, *_ = x.path.split('/') + else: + Log.error(f'prohibited tar entry "{x.path}" in', shortPath, + summary=True) + + if pkg is None or version is None: + Log.error('".brew" dir missing. Failed to extract', shortPath, + summary=True, count=True) + return None + + Log.main('would install' if dryRun else 'install', shortPath, + f'({pkg} {version})', count=True) + if not dryRun: + tar.extractall(Cellar.CELLAR, subset) + return TarPackage.PkgVer(pkg, version) + + # Copied from Python 3.12 tarfile _get_filtered_attrs + def filter(self, member: TarInfo, dest_path: str) -> bool: + '''Remove dangerous tar elements (relative dir escape & permissions)''' + dest_path = os.path.realpath(dest_path) + # Strip leading / (tar's directory separator) from filenames. + # Include os.sep (target OS directory separator) as well. + if member.name.startswith(('/', os.sep)): + return False + # Ensure we stay in the destination + target_path = os.path.realpath(os.path.join(dest_path, member.name)) + if os.path.commonpath([target_path, dest_path]) != dest_path: + return False + # Limit permissions (no high bits, and go-w) + if member.mode is not None: + # Strip high bits & group/other write bits + member.mode &= 0o755 + # For data, handle permissions & file types + if member.isreg() or member.islnk(): + if not member.mode & 0o100: + # Clear executable bits if not executable by user + member.mode &= ~0o111 + # Ensure owner can read & write + member.mode |= 0o600 + elif member.isdir() or member.issym(): + # Ignore mode for directories & symlinks + pass + else: + # Reject special files + return False + + # Check link destination for 'data' + if member.islnk() or member.issym(): + if os.path.isabs(member.linkname): + return False + normalized = os.path.normpath(member.linkname) + if normalized != member.linkname: + member.linkname = normalized + if member.issym(): + target_path = os.path.join( + dest_path, os.path.dirname(member.name), member.linkname) + else: + target_path = os.path.join(dest_path, member.linkname) + target_path = os.path.realpath(target_path) + if os.path.commonpath([target_path, dest_path]) != dest_path: + return False + return True + + # ----------------------------------- # InstallQueue # ----------------------------------- @@ -1465,10 +1521,10 @@ class InstallQueue: Log.beginErrorSummary() # reverse to install main package last (allow re-install until success) for i, tar in enumerate(reversed(self.installQueue), 1): - bundle = Cellar.installTar(tar, dryRun=self.dryRun) - if bundle: + bundle = TarPackage(tar).extract(dryRun=self.dryRun) + if bundle and not self.dryRun: self.postInstall( - bundle[0], bundle[1], File.sha256(tar), + bundle.package, bundle.version, File.sha256(tar), skipLink=skipLink, linkExe=linkExe, isPrimary=i == total) Log.endCounter() Log.dumpErrorSummary() @@ -2343,57 +2399,6 @@ class Log: Log._SUMMARY = None -# ----------------------------------- -# Misc -# ----------------------------------- - -# Copied from Python 3.12 tarfile _get_filtered_attrs -def tarFilter(member: TarInfo, dest_path: str) -> bool: - dest_path = os.path.realpath(dest_path) - # Strip leading / (tar's directory separator) from filenames. - # Include os.sep (target OS directory separator) as well. - if member.name.startswith(('/', os.sep)): - return False - # Ensure we stay in the destination - target_path = os.path.realpath(os.path.join(dest_path, member.name)) - if os.path.commonpath([target_path, dest_path]) != dest_path: - return False - # Limit permissions (no high bits, and go-w) - if member.mode is not None: - # Strip high bits & group/other write bits - member.mode &= 0o755 - # For data, handle permissions & file types - if member.isreg() or member.islnk(): - if not member.mode & 0o100: - # Clear executable bits if not executable by user - member.mode &= ~0o111 - # Ensure owner can read & write - member.mode |= 0o600 - elif member.isdir() or member.issym(): - # Ignore mode for directories & symlinks - pass - else: - # Reject special files - return False - - # Check link destination for 'data' - if member.islnk() or member.issym(): - if os.path.isabs(member.linkname): - return False - normalized = os.path.normpath(member.linkname) - if normalized != member.linkname: - member.linkname = normalized - if member.issym(): - target_path = os.path.join( - dest_path, os.path.dirname(member.name), member.linkname) - else: - target_path = os.path.join(dest_path, member.linkname) - target_path = os.path.realpath(target_path) - if os.path.commonpath([target_path, dest_path]) != dest_path: - return False - return True - - if __name__ == '__main__': main()