#!/usr/bin/env python3 from typing import TYPE_CHECKING, Iterable from multiprocessing import Pool from pathlib import Path from urllib.parse import quote from urllib.request import Request, urlopen, urlretrieve from argparse import ArgumentParser from sys import stderr import plistlib import sqlite3 import json import gzip import os import re import warnings with warnings.catch_warnings(): # hide macOS LibreSSL warning warnings.filterwarnings('ignore') from remotezip import RemoteZip # pip install remotezip if TYPE_CHECKING: from zipfile import ZipInfo USE_ZIP_FILESIZE = False re_info_plist = re.compile(r'Payload/([^/]+)/Info.plist') # re_links = re.compile(r''']*href=["']([^>]+\.ipa)["'][^>]*>''') re_archive_url = re.compile( r'https?://archive.org/(?:metadata|details|download)/([^/]+)(?:/.*)?') CACHE_DIR = Path(__file__).parent / 'data' CACHE_DIR.mkdir(exist_ok=True) def main(): CacheDB().init() parser = ArgumentParser() cli = parser.add_subparsers(metavar='command', dest='cmd', required=True) cmd = cli.add_parser('add', help='Add urls to cache') cmd.add_argument('urls', metavar='URL', nargs='+', help='Search URLs for .ipa links') cmd = cli.add_parser('update', help='Update all urls') cmd.add_argument('urls', metavar='URL', nargs='*', help='URLs or index') cmd = cli.add_parser('run', help='Download and process pending urls') cmd.add_argument('-force', '-f', action='store_true', help='Reindex local data / populate DB.' 'Make sure to export fsize before!') cmd.add_argument('pk', metavar='PK', type=int, nargs='*', help='Primary key') cmd = cli.add_parser('export', help='Export data') cmd.add_argument('export_type', choices=['json', 'fsize'], help='Export to json or temporary-filesize file') cmd = cli.add_parser('err', help='Handle problematic entries') cmd.add_argument('err_type', choices=['reset'], help='Set done=0 to retry') cmd = cli.add_parser('get', help='Lookup value') cmd.add_argument('get_type', choices=['url', 'img', 'ipa'], help='Get data field or download image.') cmd.add_argument('pk', metavar='PK', type=int, nargs='+', help='Primary key') cmd = cli.add_parser('set', help='(Re)set value') cmd.add_argument('set_type', choices=['err'], help='Data field/column') cmd.add_argument('pk', metavar='PK', type=int, nargs='+', help='Primary key') args = parser.parse_args() if args.cmd == 'add': for url in args.urls: addNewUrl(url) print('done.') elif args.cmd == 'update': queue = args.urls or CacheDB().getUpdateUrlIds(sinceNow='-7 days') if queue: for i, url in enumerate(queue): updateUrl(url, i + 1, len(queue)) print('done.') else: print('Nothing to do.') elif args.cmd == 'run': DB = CacheDB() if args.pk: for pk in args.pk: url = DB.getUrl(pk) print(pk, ': process', url) loadIpa(pk, url, overwrite=True) else: if args.force: print('Resetting done state ...') DB.setAllUndone(whereDone=1) processPending() elif args.cmd == 'err': if args.err_type == 'reset': print('Resetting error state ...') CacheDB().setAllUndone(whereDone=3) elif args.cmd == 'export': if args.export_type == 'json': export_json() elif args.export_type == 'fsize': export_filesize() elif args.cmd == 'get': DB = CacheDB() if args.get_type == 'url': for pk in args.pk: print(pk, ':', DB.getUrl(pk)) elif args.get_type == 'img': for pk in args.pk: url = DB.getUrl(pk) print(pk, ': load image', url) loadIpa(pk, url, overwrite=True, image_only=True) elif args.get_type == 'ipa': dir = Path('ipa_download') dir.mkdir(exist_ok=True) for pk in args.pk: url = DB.getUrl(pk) print(pk, ': load ipa', url) urlretrieve(url, dir / f'{pk}.ipa', printProgress) print(end='\r') elif args.cmd == 'set': DB = CacheDB() if args.set_type == 'err': for pk in args.pk: print(pk, ': set done=4') DB.setPermanentError(pk) ############################################### # Database ############################################### class CacheDB: def __init__(self) -> None: self._db = sqlite3.connect(CACHE_DIR / 'ipa_cache.db') self._db.execute('pragma busy_timeout=5000') def init(self): self._db.execute(''' CREATE TABLE IF NOT EXISTS urls( pk INTEGER PRIMARY KEY, url TEXT NOT NULL UNIQUE, date INTEGER DEFAULT (strftime('%s','now')) ); ''') self._db.execute(''' CREATE TABLE IF NOT EXISTS idx( pk INTEGER PRIMARY KEY, base_url INTEGER NOT NULL, path_name TEXT NOT NULL, done INTEGER DEFAULT 0, fsize INTEGER DEFAULT 0, min_os INTEGER DEFAULT NULL, platform INTEGER DEFAULT NULL, title TEXT DEFAULT NULL, bundle_id TEXT DEFAULT NULL, version TEXT DEFAULT NULL, UNIQUE(base_url, path_name) ON CONFLICT ABORT, FOREIGN KEY (base_url) REFERENCES urls (pk) ON DELETE RESTRICT ); ''') def __del__(self) -> None: self._db.close() # Get URL def getIdForBaseUrl(self, url: str) -> 'int|None': x = self._db.execute('SELECT pk FROM urls WHERE url=?', [url]) row = x.fetchone() return row[0] if row else None def getBaseUrlForId(self, uid: int) -> 'str|None': x = self._db.execute('SELECT url FROM urls WHERE pk=?', [uid]) row = x.fetchone() return row[0] if row else None def getId(self, baseUrlId: int, pathName: str) -> 'int|None': x = self._db.execute('''SELECT pk FROM idx WHERE base_url=? AND path_name=?;''', [baseUrlId, pathName]) row = x.fetchone() return row[0] if row else None def getUrl(self, uid: int) -> str: x = self._db.execute('''SELECT url, path_name FROM idx INNER JOIN urls ON urls.pk=base_url WHERE idx.pk=?;''', [uid]) base, path = x.fetchone() return base + '/' + quote(path) # Insert URL def insertBaseUrl(self, base: str) -> int: try: x = self._db.execute('INSERT INTO urls (url) VALUES (?);', [base]) self._db.commit() return x.lastrowid # type: ignore except sqlite3.IntegrityError: x = self._db.execute('SELECT pk FROM urls WHERE url = ?;', [base]) return x.fetchone()[0] def insertIpaUrls( self, baseUrlId: int, entries: 'Iterable[tuple[str, int, str]]' ) -> int: ''' :entries: must be iterable of `(path_name, filesize, crc32)` ''' self._db.executemany(''' INSERT OR IGNORE INTO idx (base_url, path_name, fsize) VALUES (?,?,?); ''', ((baseUrlId, path, size) for path, size, _crc in entries)) self._db.commit() return self._db.total_changes # Update URL def getUpdateUrlIds(self, *, sinceNow: str) -> 'list[int]': x = self._db.execute('''SELECT pk FROM urls WHERE date IS NULL OR date < strftime('%s','now', ?) ''', [sinceNow]) return [row[0] for row in x.fetchall()] def markBaseUrlUpdated(self, uid: int) -> None: self._db.execute(''' UPDATE urls SET date=strftime('%s','now') WHERE pk=?''', [uid]) self._db.commit() def updateIpaUrl(self, baseUrlId: int, entry: 'tuple[str, int, str]') \ -> 'int|None': ''' :entry: must be `(path_name, filesize, crc32)` ''' uid = self.getId(baseUrlId, entry[0]) if uid: self._db.execute('UPDATE idx SET done=0, fsize=? WHERE pk=?;', [entry[1], uid]) self._db.commit() return uid if self.insertIpaUrls(baseUrlId, [entry]) > 0: x = self._db.execute('SELECT MAX(pk) FROM idx;') return x.fetchone()[0] return None # Export JSON def jsonUrlMap(self) -> 'dict[int, str]': x = self._db.execute('SELECT pk, url FROM urls') rv = {} for pk, url in x: rv[pk] = url return rv def enumJsonIpa(self, *, done: int) -> Iterable[tuple]: yield from self._db.execute(''' SELECT pk, platform, IFNULL(min_os, 0), TRIM(IFNULL(title, REPLACE(path_name,RTRIM(path_name,REPLACE(path_name,'/','')),'') )) as tt, IFNULL(bundle_id, ""), version, base_url, path_name, fsize / 1024 FROM idx WHERE done=? ORDER BY tt COLLATE NOCASE, min_os, platform, version;''', [done]) # Filesize def enumFilesize(self) -> Iterable[tuple]: yield from self._db.execute('SELECT pk, fsize FROM idx WHERE fsize>0;') def setFilesize(self, uid: int, size: int) -> None: if size > 0: self._db.execute('UPDATE idx SET fsize=? WHERE pk=?;', [size, uid]) self._db.commit() # Process Pending def count(self, *, done: int) -> int: x = self._db.execute('SELECT COUNT() FROM idx WHERE done=?;', [done]) return x.fetchone()[0] def getPendingQueue(self, *, done: int, batchsize: int) \ -> 'list[tuple[int, str, str]]': # url || "/" || REPLACE(REPLACE(path_name, '#', '%23'), '?', '%3F') x = self._db.execute('''SELECT idx.pk, url, path_name FROM idx INNER JOIN urls ON urls.pk=base_url WHERE done=? LIMIT ?;''', [done, batchsize]) return x.fetchall() def setAllUndone(self, *, whereDone: int) -> None: self._db.execute('UPDATE idx SET done=0 WHERE done=?;', [whereDone]) self._db.commit() # Finalize / Postprocessing def setError(self, uid: int, *, done: int) -> None: self._db.execute('UPDATE idx SET done=? WHERE pk=?;', [done, uid]) self._db.commit() def setPermanentError(self, uid: int) -> None: ''' Set done=4 and all file related columns to NULL. Will also delete all plist, and image files for {uid} in CACHE_DIR ''' self._db.execute(''' UPDATE idx SET done=4, min_os=NULL, platform=NULL, title=NULL, bundle_id=NULL, version=NULL WHERE pk=?;''', [uid]) self._db.commit() for ext in ['.plist', '.png', '.jpg']: fname = diskPath(uid, ext) if fname.exists(): os.remove(fname) def setDone(self, uid: int) -> None: plist_path = diskPath(uid, '.plist') if not plist_path.exists(): return with open(plist_path, 'rb') as fp: try: plist = plistlib.load(fp) except Exception as e: print(f'ERROR: [{uid}] PLIST: {e}', file=stderr) self.setError(uid, done=3) return bundleId = plist.get('CFBundleIdentifier') title = plist.get('CFBundleDisplayName') or plist.get('CFBundleName') v_short = str(plist.get('CFBundleShortVersionString', '')) v_long = str(plist.get('CFBundleVersion', '')) version = v_short or v_long if version != v_long and v_long: version += f' ({v_long})' minOS = [int(x) for x in plist.get('MinimumOSVersion', '0').split('.')] minOS += [0, 0, 0] # ensures at least 3 components are given platforms = sum(1 << int(x) for x in plist.get('UIDeviceFamily', [])) if not platforms and minOS[0] in [0, 1, 2, 3]: platforms = 1 << 1 # fallback to iPhone for old versions self._db.execute(''' UPDATE idx SET done=1, min_os=?, platform=?, title=?, bundle_id=?, version=? WHERE pk=?;''', [ (minOS[0] * 10000 + minOS[1] * 100 + minOS[2]) or None, platforms or None, title or None, bundleId or None, version or None, uid, ]) self._db.commit() ############################################### # [add] Process HTML link list ############################################### def addNewUrl(url: str) -> None: archiveId = extractArchiveOrgId(url) if not archiveId: return baseUrlId = CacheDB().insertBaseUrl(urlForArchiveOrgId(archiveId)) json_file = pathToListJson(baseUrlId) entries = downloadListArchiveOrg(archiveId, json_file) inserted = CacheDB().insertIpaUrls(baseUrlId, entries) print(f'new links added: {inserted} of {len(entries)}') def extractArchiveOrgId(url: str) -> 'str|None': match = re_archive_url.match(url) if not match: print(f'[WARN] not an archive.org url. Ignoring "{url}"', file=stderr) return None return match.group(1) def urlForArchiveOrgId(archiveId: str) -> str: return f'https://archive.org/download/{archiveId}' def pathToListJson(baseUrlId: int, *, tmp: bool = False) -> Path: if tmp: return CACHE_DIR / 'url_cache' / f'tmp_{baseUrlId}.json.gz' return CACHE_DIR / 'url_cache' / f'{baseUrlId}.json.gz' def downloadListArchiveOrg( archiveId: str, json_file: Path, *, force: bool = False ) -> 'list[tuple[str, int, str]]': ''' :returns: List of `(path_name, file_size, crc32)` ''' # store json for later if force or not json_file.exists(): json_file.parent.mkdir(exist_ok=True) print(f'load: {archiveId}') req = Request(f'https://archive.org/metadata/{archiveId}/files') req.add_header('Accept-Encoding', 'deflate, gzip') with urlopen(req) as page: with open(json_file, 'wb') as fp: while True: block = page.read(8096) if not block: break fp.write(block) # read saved json from disk with gzip.open(json_file, 'rb') as fp: data = json.load(fp) # process and add to DB return [(x['name'], int(x.get('size', 0)), x.get('crc32')) for x in data['result'] if x['source'] == 'original' and x['name'].endswith('.ipa')] ############################################### # [update] Re-index existing URL caches ############################################### def updateUrl(url_or_uid: 'str|int', proc_i: int, proc_total: int): baseUrlId, url = _lookupBaseUrl(url_or_uid) if not baseUrlId or not url: print(f'[ERROR] Ignoring "{url_or_uid}". Not found in DB', file=stderr) return archiveId = extractArchiveOrgId(url) or '' # guaranteed to return str print(f'Updating [{proc_i}/{proc_total}] {archiveId}') old_json_file = pathToListJson(baseUrlId) new_json_file = pathToListJson(baseUrlId, tmp=True) old_entries = set(downloadListArchiveOrg(archiveId, old_json_file)) new_entries = set(downloadListArchiveOrg(archiveId, new_json_file)) old_diff = old_entries - new_entries new_diff = new_entries - old_entries DB = CacheDB() if old_diff or new_diff: c_del = 0 c_new = 0 for old_entry in old_diff: # no need to sort uid = DB.getId(baseUrlId, old_entry[0]) if uid: print(f' rm: [{uid}] {old_entry}') DB.setPermanentError(uid) c_del += 1 else: print(f' [ERROR] could not find old entry {old_entry[0]}', file=stderr) for new_entry in sorted(new_diff): uid = DB.updateIpaUrl(baseUrlId, new_entry) if uid: print(f' add: [{uid}] {new_entry}') c_new += 1 else: print(f' [ERROR] updating {new_entry[0]}', file=stderr) print(f' updated -{c_del}/+{c_new} entries.') os.rename(new_json_file, old_json_file) else: print(' no changes.') DB.markBaseUrlUpdated(baseUrlId) if new_json_file.exists(): os.remove(new_json_file) def _lookupBaseUrl(url_or_index: 'str|int') -> 'tuple[int|None, str|None]': if isinstance(url_or_index, str): if url_or_index.isnumeric(): url_or_index = int(url_or_index) if isinstance(url_or_index, int): baseUrlId = url_or_index url = CacheDB().getBaseUrlForId(baseUrlId) else: archiveId = extractArchiveOrgId(url_or_index) if not archiveId: return None, None url = urlForArchiveOrgId(archiveId) baseUrlId = CacheDB().getIdForBaseUrl(url) return baseUrlId, url ############################################### # [run] Process pending urls from DB ############################################### def processPending(): processed = 0 with Pool(processes=8) as pool: while True: DB = CacheDB() pending = DB.count(done=0) batch = DB.getPendingQueue(done=0, batchsize=100) del DB if not batch: print('Queue empty. done.') break batch = [(processed + i + 1, pending - i - 1, *x) for i, x in enumerate(batch)] result = pool.starmap_async(procSinglePending, batch).get() processed += len(result) DB = CacheDB() for uid, success in result: fsize = onceReadSizeFromFile(uid) if fsize: DB.setFilesize(uid, fsize) if success: DB.setDone(uid) else: DB.setError(uid, done=3) del DB DB = CacheDB() err_count = DB.count(done=3) if err_count > 0: print() print('URLs with Error:', err_count) for uid, base, path_name in DB.getPendingQueue(done=3, batchsize=10): print(f' - [{uid}] {base}/{quote(path_name)}') def procSinglePending( processed: int, pending: int, uid: int, base_url: str, path_name ) -> 'tuple[int, bool]': url = base_url + '/' + quote(path_name) humanUrl = url.split('archive.org/download/')[-1] print(f'[{processed}|{pending} queued]: load[{uid}] {humanUrl}') try: return uid, loadIpa(uid, url) except Exception as e: print(f'ERROR: [{uid}] {e}', file=stderr) return uid, False def onceReadSizeFromFile(uid: int) -> 'int|None': size_path = diskPath(uid, '.size') if size_path.exists(): with open(size_path, 'r') as fp: size = int(fp.read()) os.remove(size_path) return size return None ############################################### # Process IPA zip ############################################### def loadIpa(uid: int, url: str, *, overwrite: bool = False, image_only: bool = False) -> bool: basename = diskPath(uid, '') basename.parent.mkdir(exist_ok=True) img_path = basename.with_suffix('.png') plist_path = basename.with_suffix('.plist') if not overwrite and plist_path.exists(): return True with RemoteZip(url) as zip: if USE_ZIP_FILESIZE: filesize = zip.fp.tell() if zip.fp else 0 with open(basename.with_suffix('.size'), 'w') as fp: fp.write(str(filesize)) app_name = None artwork = False zip_listing = zip.infolist() has_payload_folder = False for entry in zip_listing: fn = entry.filename.lstrip('/') has_payload_folder |= fn.startswith('Payload/') plist_match = re_info_plist.match(fn) if fn == 'iTunesArtwork': extractZipEntry(zip, entry, img_path) artwork = os.path.getsize(img_path) > 0 elif plist_match: app_name = plist_match.group(1) if not image_only: extractZipEntry(zip, entry, plist_path) if not has_payload_folder: print(f'ERROR: [{uid}] ipa has no "Payload/" root folder', file=stderr) # if no iTunesArtwork found, load file referenced in plist if not artwork and app_name and plist_path.exists(): with open(plist_path, 'rb') as fp: icon_names = iconNameFromPlist(plistlib.load(fp)) icon = expandImageName(zip_listing, app_name, icon_names) if icon: extractZipEntry(zip, icon, img_path) return plist_path.exists() def extractZipEntry(zip: 'RemoteZip', zipInfo: 'ZipInfo', dest_filename: Path): with zip.open(zipInfo) as src: with open(dest_filename, 'wb') as tgt: tgt.write(src.read()) ############################################### # Icon name extraction ############################################### RESOLUTION_ORDER = ['3x', '2x', '180', '167', '152', '120'] def expandImageName( zip_listing: 'list[ZipInfo]', appName: str, iconList: 'list[str]' ) -> 'ZipInfo|None': for iconName in iconList + ['Icon', 'icon']: zipPath = f'Payload/{appName}/{iconName}' matchingNames = [x.filename.split('/', 2)[-1] for x in zip_listing if x.filename.lstrip('/').startswith(zipPath)] if len(matchingNames) > 0: for bestName in sortedByResolution(matchingNames): bestPath = f'Payload/{appName}/{bestName}' for x in zip_listing: if x.filename.lstrip('/') == bestPath and x.file_size > 0: return x return None def unpackNameListFromPlistDict(bundleDict: 'dict|None') -> 'list[str]|None': if not bundleDict: return None primaryDict = bundleDict.get('CFBundlePrimaryIcon', {}) icons = primaryDict.get('CFBundleIconFiles') if not icons: singular = primaryDict.get('CFBundleIconName') if singular: return [singular] return icons def resolutionIndex(icon_name: str): for i, match in enumerate(RESOLUTION_ORDER): if match in icon_name: return i if 'small' in icon_name.lower(): return 99 return 50 def sortedByResolution(icons: 'list[str]') -> 'list[str]': icons.sort(key=resolutionIndex) return icons def iconNameFromPlist(plist: dict) -> 'list[str]': # Check for CFBundleIcons (since 5.0) icons = unpackNameListFromPlistDict(plist.get('CFBundleIcons')) if not icons: icons = unpackNameListFromPlistDict(plist.get('CFBundleIcons~ipad')) if not icons: # Check for CFBundleIconFiles (since 3.2) icons = plist.get('CFBundleIconFiles') if not icons: # key found on iTunesU app icons = plist.get('Icon files') if not icons: # Check for CFBundleIconFile (legacy, before 3.2) icon = plist.get('CFBundleIconFile') # may be None return [icon] if icon else [] return sortedByResolution(icons) ############################################### # [json] Export to json ############################################### def export_json(): DB = CacheDB() url_map = DB.jsonUrlMap() maxUrlId = max(url_map.keys()) # just a visual separator maxUrlId += 1 url_map[maxUrlId] = '---' submap = {} total = DB.count(done=1) with open(CACHE_DIR / 'ipa.json', 'w') as fp: fp.write('[') for i, entry in enumerate(DB.enumJsonIpa(done=1)): if i % 113 == 0: print(f'\rprocessing [{i}/{total}]', end='') # if path_name is in a subdirectory, reindex URLs if '/' in entry[7]: baseurl = url_map[entry[6]] sub_dir, sub_file = entry[7].split('/', 1) newurl = baseurl + '/' + sub_dir subIdx = submap.get(newurl, None) if subIdx is None: maxUrlId += 1 submap[newurl] = maxUrlId subIdx = maxUrlId entry = list(entry) entry[6] = subIdx entry[7] = sub_file fp.write(json.dumps(entry, separators=(',', ':')) + ',\n') fp.seek(max(fp.tell(), 3) - 2) fp.write(']') print('\r', end='') print(f'write ipa.json: {total} entries') for newurl, newidx in submap.items(): url_map[newidx] = newurl with open(CACHE_DIR / 'urls.json', 'w') as fp: fp.write(json.dumps(url_map, separators=(',\n', ':'), sort_keys=True)) print(f'write urls.json: {len(url_map)} entries') def export_filesize(): ignored = 0 written = 0 for i, (uid, fsize) in enumerate(CacheDB().enumFilesize()): size_path = diskPath(uid, '.size') if not size_path.exists(): with open(size_path, 'w') as fp: fp.write(str(fsize)) written += 1 else: ignored += 1 if i % 113 == 0: print(f'\r{written} files written. {ignored} ignored', end='') print(f'\r{written} files written. {ignored} ignored. done.') ############################################### # Helper ############################################### def diskPath(uid: int, ext: str) -> Path: return CACHE_DIR / str(uid // 1000) / f'{uid}{ext}' def printProgress(blocknum, bs, size): if size == 0: return percent = (blocknum * bs) / size done = "#" * int(40 * percent) print(f'\r[{done:<40}] {percent:.1%}', end='') # def b64e(text: str) -> str: # return b64encode(text.encode('utf8')).decode('ascii') if __name__ == '__main__': main()