feat: TarPackage

This commit is contained in:
relikd
2025-08-31 13:45:15 +02:00
parent c156b6d66b
commit 023983bcbb

189
brew.py
View File

@@ -1213,44 +1213,6 @@ class Cellar:
pkgs = filterPkg if filterPkg else sorted(os.listdir(Cellar.CELLAR))
return [info for pkg in pkgs if (info := Cellar.info(pkg)).installed]
# Install management
@staticmethod
def installTar(tarPath: str, *, dryRun: bool = False) \
-> 'tuple[str, str]|None':
''' Extract tar file into `@/cellar/...` '''
shortPath = os.path.relpath(tarPath, Cellar.ROOT)
if shortPath.startswith('..'): # if path outside of cellar
shortPath = os.path.basename(tarPath)
if not os.path.isfile(tarPath):
if dryRun:
Log.main('would install', shortPath, count=True)
return None
pkg, version = None, None
with openTarfile(tarPath, 'r') as tar:
subset = []
for x in tar:
if tarFilter(x, Cellar.CELLAR):
subset.append(x)
if not pkg and x.isdir() and x.path.endswith('/.brew'):
pkg, version, *_ = x.path.split('/')
else:
Log.error(f'prohibited tar entry "{x.path}" in', shortPath,
summary=True)
if pkg is None or version is None:
Log.error('".brew" dir missing. Failed to extract', shortPath,
summary=True, count=True)
return None
Log.main('would install' if dryRun else 'install', shortPath,
f'({pkg} {version})', count=True)
if not dryRun:
tar.extractall(Cellar.CELLAR, subset)
return pkg, version
@staticmethod
def getDependencyTree() -> DependencyTree:
''' Returns dict object for dependency traversal '''
@@ -1371,6 +1333,100 @@ class Cellar:
return RubyParser(Cellar.rubyPath(pkg, version)).parseKegOnly()
# -----------------------------------
# TarPackage
# -----------------------------------
class TarPackage:
class PkgVer(NamedTuple):
package: str
version: str
def __init__(self, fname: str) -> None:
self.fname = fname
def extract(self, *, dryRun: bool = False) -> 'PkgVer|None':
''' Extract tar file into `@/cellar/...` '''
shortPath = os.path.relpath(self.fname, Cellar.ROOT)
if shortPath.startswith('..'): # if path outside of cellar
shortPath = os.path.basename(self.fname)
if not os.path.isfile(self.fname):
if dryRun:
Log.main('would install', shortPath, count=True)
return None
pkg, version = None, None
with openTarfile(self.fname, 'r') as tar:
subset = []
for x in tar:
if self.filter(x, Cellar.CELLAR):
subset.append(x)
if not pkg and x.isdir() and x.path.endswith('/.brew'):
pkg, version, *_ = x.path.split('/')
else:
Log.error(f'prohibited tar entry "{x.path}" in', shortPath,
summary=True)
if pkg is None or version is None:
Log.error('".brew" dir missing. Failed to extract', shortPath,
summary=True, count=True)
return None
Log.main('would install' if dryRun else 'install', shortPath,
f'({pkg} {version})', count=True)
if not dryRun:
tar.extractall(Cellar.CELLAR, subset)
return TarPackage.PkgVer(pkg, version)
# Copied from Python 3.12 tarfile _get_filtered_attrs
def filter(self, member: TarInfo, dest_path: str) -> bool:
'''Remove dangerous tar elements (relative dir escape & permissions)'''
dest_path = os.path.realpath(dest_path)
# Strip leading / (tar's directory separator) from filenames.
# Include os.sep (target OS directory separator) as well.
if member.name.startswith(('/', os.sep)):
return False
# Ensure we stay in the destination
target_path = os.path.realpath(os.path.join(dest_path, member.name))
if os.path.commonpath([target_path, dest_path]) != dest_path:
return False
# Limit permissions (no high bits, and go-w)
if member.mode is not None:
# Strip high bits & group/other write bits
member.mode &= 0o755
# For data, handle permissions & file types
if member.isreg() or member.islnk():
if not member.mode & 0o100:
# Clear executable bits if not executable by user
member.mode &= ~0o111
# Ensure owner can read & write
member.mode |= 0o600
elif member.isdir() or member.issym():
# Ignore mode for directories & symlinks
pass
else:
# Reject special files
return False
# Check link destination for 'data'
if member.islnk() or member.issym():
if os.path.isabs(member.linkname):
return False
normalized = os.path.normpath(member.linkname)
if normalized != member.linkname:
member.linkname = normalized
if member.issym():
target_path = os.path.join(
dest_path, os.path.dirname(member.name), member.linkname)
else:
target_path = os.path.join(dest_path, member.linkname)
target_path = os.path.realpath(target_path)
if os.path.commonpath([target_path, dest_path]) != dest_path:
return False
return True
# -----------------------------------
# InstallQueue
# -----------------------------------
@@ -1465,10 +1521,10 @@ class InstallQueue:
Log.beginErrorSummary()
# reverse to install main package last (allow re-install until success)
for i, tar in enumerate(reversed(self.installQueue), 1):
bundle = Cellar.installTar(tar, dryRun=self.dryRun)
if bundle:
bundle = TarPackage(tar).extract(dryRun=self.dryRun)
if bundle and not self.dryRun:
self.postInstall(
bundle[0], bundle[1], File.sha256(tar),
bundle.package, bundle.version, File.sha256(tar),
skipLink=skipLink, linkExe=linkExe, isPrimary=i == total)
Log.endCounter()
Log.dumpErrorSummary()
@@ -2343,57 +2399,6 @@ class Log:
Log._SUMMARY = None
# -----------------------------------
# Misc
# -----------------------------------
# Copied from Python 3.12 tarfile _get_filtered_attrs
def tarFilter(member: TarInfo, dest_path: str) -> bool:
dest_path = os.path.realpath(dest_path)
# Strip leading / (tar's directory separator) from filenames.
# Include os.sep (target OS directory separator) as well.
if member.name.startswith(('/', os.sep)):
return False
# Ensure we stay in the destination
target_path = os.path.realpath(os.path.join(dest_path, member.name))
if os.path.commonpath([target_path, dest_path]) != dest_path:
return False
# Limit permissions (no high bits, and go-w)
if member.mode is not None:
# Strip high bits & group/other write bits
member.mode &= 0o755
# For data, handle permissions & file types
if member.isreg() or member.islnk():
if not member.mode & 0o100:
# Clear executable bits if not executable by user
member.mode &= ~0o111
# Ensure owner can read & write
member.mode |= 0o600
elif member.isdir() or member.issym():
# Ignore mode for directories & symlinks
pass
else:
# Reject special files
return False
# Check link destination for 'data'
if member.islnk() or member.issym():
if os.path.isabs(member.linkname):
return False
normalized = os.path.normpath(member.linkname)
if normalized != member.linkname:
member.linkname = normalized
if member.issym():
target_path = os.path.join(
dest_path, os.path.dirname(member.name), member.linkname)
else:
target_path = os.path.join(dest_path, member.linkname)
target_path = os.path.realpath(target_path)
if os.path.commonpath([target_path, dest_path]) != dest_path:
return False
return True
if __name__ == '__main__':
main()