#!/usr/bin/env python3 from typing import TYPE_CHECKING, Iterable from multiprocessing import Pool from pathlib import Path from urllib.parse import quote from urllib.request import Request, urlopen, urlretrieve from argparse import ArgumentParser from sys import stderr import plistlib import sqlite3 import json import gzip import os import re import warnings with warnings.catch_warnings(): # hide macOS LibreSSL warning warnings.filterwarnings('ignore') from remotezip import RemoteZip # pip install remotezip if TYPE_CHECKING: from zipfile import ZipInfo USE_ZIP_FILESIZE = False re_info_plist = re.compile(r'Payload/([^/]+)/Info.plist') # re_links = re.compile(r''']*href=["']([^>]+\.ipa)["'][^>]*>''') re_archive_url = re.compile( r'https?://archive.org/(?:metadata|details|download)/([^/]+)(?:/.*)?') CACHE_DIR = Path(__file__).parent / 'data' CACHE_DIR.mkdir(exist_ok=True) def main(): CacheDB().init() parser = ArgumentParser() cli = parser.add_subparsers(metavar='command', dest='cmd', required=True) cmd = cli.add_parser('add', help='Add urls to cache') cmd.add_argument('urls', metavar='URL', nargs='+', help='Search URLs for .ipa links') cmd = cli.add_parser('run', help='Download and process pending urls') cmd.add_argument('-force', '-f', action='store_true', help='Reindex local data / populate DB.' 'Make sure to export fsize before!') cmd.add_argument('pk', metavar='PK', type=int, nargs='*', help='Primary key') cmd = cli.add_parser('export', help='Export data') cmd.add_argument('export_type', choices=['json', 'fsize'], help='Export to json or temporary-filesize file') cmd = cli.add_parser('err', help='Handle problematic entries') cmd.add_argument('err_type', choices=['reset'], help='Set done=0 to retry') cmd = cli.add_parser('get', help='Lookup value') cmd.add_argument('get_type', choices=['url', 'img', 'ipa'], help='Get data field or download image.') cmd.add_argument('pk', metavar='PK', type=int, nargs='+', help='Primary key') cmd = cli.add_parser('set', help='(Re)set value') cmd.add_argument('set_type', choices=['err'], help='Data field/column') cmd.add_argument('pk', metavar='PK', type=int, nargs='+', help='Primary key') args = parser.parse_args() if args.cmd == 'add': for url in args.urls: crawler(url) print('done.') elif args.cmd == 'run': DB = CacheDB() if args.pk: for pk in args.pk: url = DB.getUrl(pk) print(pk, ': process', url) loadIpa(pk, url, overwrite=True) else: if args.force: print('Resetting done state ...') DB.setAllUndone(whereDone=1) processPending() elif args.cmd == 'err': if args.err_type == 'reset': print('Resetting error state ...') CacheDB().setAllUndone(whereDone=3) elif args.cmd == 'export': if args.export_type == 'json': export_json() elif args.export_type == 'fsize': export_filesize() elif args.cmd == 'get': DB = CacheDB() if args.get_type == 'url': for pk in args.pk: print(pk, ':', DB.getUrl(pk)) elif args.get_type == 'img': for pk in args.pk: url = DB.getUrl(pk) print(pk, ': load image', url) loadIpa(pk, url, overwrite=True, image_only=True) elif args.get_type == 'ipa': dir = Path('ipa_download') dir.mkdir(exist_ok=True) for pk in args.pk: url = DB.getUrl(pk) print(pk, ': load ipa', url) urlretrieve(url, dir / f'{pk}.ipa', printProgress) print(end='\r') elif args.cmd == 'set': DB = CacheDB() if args.set_type == 'err': for pk in args.pk: print(pk, ': set done=4') DB.setPermanentError(pk) ############################################### # Database ############################################### class CacheDB: def __init__(self) -> None: self._db = sqlite3.connect(CACHE_DIR / 'ipa_cache.db') self._db.execute('pragma busy_timeout=5000') def init(self): self._db.execute(''' CREATE TABLE IF NOT EXISTS urls( pk INTEGER PRIMARY KEY, url TEXT NOT NULL UNIQUE ); ''') self._db.execute(''' CREATE TABLE IF NOT EXISTS idx( pk INTEGER PRIMARY KEY, base_url INTEGER NOT NULL, path_name TEXT NOT NULL, done INTEGER DEFAULT 0, fsize INTEGER DEFAULT 0, min_os INTEGER DEFAULT NULL, platform INTEGER DEFAULT NULL, title TEXT DEFAULT NULL, bundle_id TEXT DEFAULT NULL, version TEXT DEFAULT NULL, UNIQUE(base_url, path_name) ON CONFLICT ABORT, FOREIGN KEY (base_url) REFERENCES urls (pk) ON DELETE RESTRICT ); ''') def __del__(self) -> None: self._db.close() # insert URLs def insertBaseUrl(self, base: str) -> int: try: x = self._db.execute('INSERT INTO urls (url) VALUES (?);', [base]) self._db.commit() return x.lastrowid # type: ignore except sqlite3.IntegrityError: x = self._db.execute('SELECT pk FROM urls WHERE url = ?;', [base]) return x.fetchone()[0] def insertIpaUrls(self, entries: 'Iterable[tuple[int, str, int]]') -> int: self._db.executemany(''' INSERT OR IGNORE INTO idx (base_url, path_name, fsize) VALUES (?,?,?); ''', entries) self._db.commit() return self._db.total_changes def getUrl(self, uid: int) -> str: x = self._db.execute('''SELECT url, path_name FROM idx INNER JOIN urls ON urls.pk=base_url WHERE idx.pk=?;''', [uid]) base, path = x.fetchone() return base + '/' + quote(path) # Export JSON def jsonUrlMap(self) -> 'dict[int, str]': x = self._db.execute('SELECT pk, url FROM urls') rv = {} for pk, url in x: rv[pk] = url return rv def enumJsonIpa(self, *, done: int) -> Iterable[tuple]: yield from self._db.execute(''' SELECT pk, platform, IFNULL(min_os, 0), TRIM(IFNULL(title, REPLACE(path_name,RTRIM(path_name,REPLACE(path_name,'/','')),'') )) as tt, IFNULL(bundle_id, ""), version, base_url, path_name, fsize / 1024 FROM idx WHERE done=? ORDER BY tt COLLATE NOCASE, min_os, platform, version;''', [done]) # Filesize def enumFilesize(self) -> Iterable[tuple]: yield from self._db.execute('SELECT pk, fsize FROM idx WHERE fsize>0;') def setFilesize(self, uid: int, size: int) -> None: if size > 0: self._db.execute('UPDATE idx SET fsize=? WHERE pk=?;', [size, uid]) self._db.commit() # Process Pending def count(self, *, done: int) -> int: x = self._db.execute('SELECT COUNT() FROM idx WHERE done=?;', [done]) return x.fetchone()[0] def getPendingQueue(self, *, done: int, batchsize: int) \ -> 'list[tuple[int, str, str]]': # url || "/" || REPLACE(REPLACE(path_name, '#', '%23'), '?', '%3F') x = self._db.execute('''SELECT idx.pk, url, path_name FROM idx INNER JOIN urls ON urls.pk=base_url WHERE done=? LIMIT ?;''', [done, batchsize]) return x.fetchall() def setAllUndone(self, *, whereDone: int) -> None: self._db.execute('UPDATE idx SET done=0 WHERE done=?;', [whereDone]) self._db.commit() # Finalize / Postprocessing def setError(self, uid: int, *, done: int) -> None: self._db.execute('UPDATE idx SET done=? WHERE pk=?;', [done, uid]) self._db.commit() def setPermanentError(self, uid: int) -> None: ''' Set done=4 and all file related columns to NULL. Will also delete all plist, and image files for {uid} in CACHE_DIR ''' self._db.execute(''' UPDATE idx SET done=4, min_os=NULL, platform=NULL, title=NULL, bundle_id=NULL, version=NULL WHERE pk=?;''', [uid]) self._db.commit() for ext in ['.plist', '.png', '.jpg']: fname = diskPath(uid, ext) if fname.exists(): os.remove(fname) def setDone(self, uid: int) -> None: plist_path = diskPath(uid, '.plist') if not plist_path.exists(): return with open(plist_path, 'rb') as fp: try: plist = plistlib.load(fp) except Exception as e: print(f'ERROR: [{uid}] PLIST: {e}', file=stderr) self.setError(uid, done=3) return bundleId = plist.get('CFBundleIdentifier') title = plist.get('CFBundleDisplayName') or plist.get('CFBundleName') version = str(plist.get('CFBundleVersion', '')) v_short = str(plist.get('CFBundleShortVersionString', '')) if not version: version = v_short if version != v_short and v_short: version = f'{version} ({v_short})' minOS = [int(x) for x in plist.get('MinimumOSVersion', '0').split('.')] minOS += [0, 0, 0] # ensures at least 3 components are given platforms = sum(1 << int(x) for x in plist.get('UIDeviceFamily', [])) if not platforms and minOS[0] in [0, 1, 2, 3]: platforms = 1 << 1 # fallback to iPhone for old versions self._db.execute(''' UPDATE idx SET done=1, min_os=?, platform=?, title=?, bundle_id=?, version=? WHERE pk=?;''', [ (minOS[0] * 10000 + minOS[1] * 100 + minOS[2]) or None, platforms or None, title or None, bundleId or None, version or None, uid, ]) self._db.commit() ############################################### # [add] Process HTML link list ############################################### def crawler(url: str) -> None: match = re_archive_url.match(url) if not match: print(f'[WARN] not an archive.org url. Ignoring "{url}"', file=stderr) return downloadListArchiveOrg(match.group(1)) def downloadListArchiveOrg(archiveId: str) -> None: baseUrl = f'https://archive.org/download/{archiveId}' baseUrlId = CacheDB().insertBaseUrl(baseUrl) json_file = CACHE_DIR / 'url_cache' / (str(baseUrlId) + '.json.gz') json_file.parent.mkdir(exist_ok=True) # store json for later if not json_file.exists(): print(f'load: [{baseUrlId}] {baseUrl}') req = Request(f'https://archive.org/metadata/{archiveId}/files') req.add_header('Accept-Encoding', 'deflate, gzip') with urlopen(req) as page: with open(json_file, 'wb') as fp: while True: block = page.read(8096) if not block: break fp.write(block) # read saved json from disk with gzip.open(json_file, 'rb') as fp: data = json.load(fp) # process and add to DB entries = [(baseUrlId, x['name'], int(x.get('size', 0))) for x in data['result'] if x['source'] == 'original' and x['name'].endswith('.ipa')] inserted = CacheDB().insertIpaUrls(entries) print(f'new links added: {inserted} of {len(entries)}') ############################################### # [run] Process pending urls from DB ############################################### def processPending(): processed = 0 with Pool(processes=8) as pool: while True: DB = CacheDB() pending = DB.count(done=0) batch = DB.getPendingQueue(done=0, batchsize=100) del DB if not batch: print('Queue empty. done.') break batch = [(processed + i + 1, pending - i - 1, *x) for i, x in enumerate(batch)] result = pool.starmap_async(procSinglePending, batch).get() processed += len(result) DB = CacheDB() for uid, success in result: fsize = onceReadSizeFromFile(uid) if fsize: DB.setFilesize(uid, fsize) if success: DB.setDone(uid) else: DB.setError(uid, done=3) del DB DB = CacheDB() err_count = DB.count(done=3) if err_count > 0: print() print('URLs with Error:', err_count) for uid, base, path_name in DB.getPendingQueue(done=3, batchsize=10): print(f' - [{uid}] {base}/{quote(path_name)}') def procSinglePending( processed: int, pending: int, uid: int, base_url: str, path_name ) -> 'tuple[int, bool]': url = base_url + '/' + quote(path_name) humanUrl = url.split('archive.org/download/')[-1] print(f'[{processed}|{pending} queued]: load[{uid}] {humanUrl}') try: return uid, loadIpa(uid, url) except Exception as e: print(f'ERROR: [{uid}] {e}', file=stderr) return uid, False def onceReadSizeFromFile(uid: int) -> 'int|None': size_path = diskPath(uid, '.size') if size_path.exists(): with open(size_path, 'r') as fp: size = int(fp.read()) os.remove(size_path) return size return None ############################################### # Process IPA zip ############################################### def loadIpa(uid: int, url: str, *, overwrite: bool = False, image_only: bool = False) -> bool: basename = diskPath(uid, '') basename.parent.mkdir(exist_ok=True) img_path = basename.with_suffix('.png') plist_path = basename.with_suffix('.plist') if not overwrite and plist_path.exists(): return True with RemoteZip(url) as zip: if USE_ZIP_FILESIZE: filesize = zip.fp.tell() if zip.fp else 0 with open(basename.with_suffix('.size'), 'w') as fp: fp.write(str(filesize)) app_name = None artwork = False zip_listing = zip.infolist() has_payload_folder = False for entry in zip_listing: fn = entry.filename.lstrip('/') has_payload_folder |= fn.startswith('Payload/') plist_match = re_info_plist.match(fn) if fn == 'iTunesArtwork': extractZipEntry(zip, entry, img_path) artwork = os.path.getsize(img_path) > 0 elif plist_match: app_name = plist_match.group(1) if not image_only: extractZipEntry(zip, entry, plist_path) if not has_payload_folder: print(f'ERROR: [{uid}] ipa has no "Payload/" root folder', file=stderr) # if no iTunesArtwork found, load file referenced in plist if not artwork and app_name and plist_path.exists(): with open(plist_path, 'rb') as fp: icon_names = iconNameFromPlist(plistlib.load(fp)) icon = expandImageName(zip_listing, app_name, icon_names) if icon: extractZipEntry(zip, icon, img_path) return plist_path.exists() def extractZipEntry(zip: 'RemoteZip', zipInfo: 'ZipInfo', dest_filename: Path): with zip.open(zipInfo) as src: with open(dest_filename, 'wb') as tgt: tgt.write(src.read()) ############################################### # Icon name extraction ############################################### RESOLUTION_ORDER = ['3x', '2x', '180', '167', '152', '120'] def expandImageName( zip_listing: 'list[ZipInfo]', appName: str, iconList: 'list[str]' ) -> 'ZipInfo|None': for iconName in iconList + ['Icon', 'icon']: zipPath = f'Payload/{appName}/{iconName}' matchingNames = [x.filename.split('/', 2)[-1] for x in zip_listing if x.filename.lstrip('/').startswith(zipPath)] if len(matchingNames) > 0: for bestName in sortedByResolution(matchingNames): bestPath = f'Payload/{appName}/{bestName}' for x in zip_listing: if x.filename.lstrip('/') == bestPath and x.file_size > 0: return x return None def unpackNameListFromPlistDict(bundleDict: 'dict|None') -> 'list[str]|None': if not bundleDict: return None primaryDict = bundleDict.get('CFBundlePrimaryIcon', {}) icons = primaryDict.get('CFBundleIconFiles') if not icons: singular = primaryDict.get('CFBundleIconName') if singular: return [singular] return icons def resolutionIndex(icon_name: str): for i, match in enumerate(RESOLUTION_ORDER): if match in icon_name: return i if 'small' in icon_name.lower(): return 99 return 50 def sortedByResolution(icons: 'list[str]') -> 'list[str]': icons.sort(key=resolutionIndex) return icons def iconNameFromPlist(plist: dict) -> 'list[str]': # Check for CFBundleIcons (since 5.0) icons = unpackNameListFromPlistDict(plist.get('CFBundleIcons')) if not icons: icons = unpackNameListFromPlistDict(plist.get('CFBundleIcons~ipad')) if not icons: # Check for CFBundleIconFiles (since 3.2) icons = plist.get('CFBundleIconFiles') if not icons: # key found on iTunesU app icons = plist.get('Icon files') if not icons: # Check for CFBundleIconFile (legacy, before 3.2) icon = plist.get('CFBundleIconFile') # may be None return [icon] if icon else [] return sortedByResolution(icons) ############################################### # [json] Export to json ############################################### def export_json(): DB = CacheDB() url_map = DB.jsonUrlMap() maxUrlId = max(url_map.keys()) # just a visual separator maxUrlId += 1 url_map[maxUrlId] = '---' submap = {} total = DB.count(done=1) with open(CACHE_DIR / 'ipa.json', 'w') as fp: fp.write('[') for i, entry in enumerate(DB.enumJsonIpa(done=1)): if i % 113 == 0: print(f'\rprocessing [{i}/{total}]', end='') # if path_name is in a subdirectory, reindex URLs if '/' in entry[7]: baseurl = url_map[entry[6]] sub_dir, sub_file = entry[7].split('/', 1) newurl = baseurl + '/' + sub_dir subIdx = submap.get(newurl, None) if subIdx is None: maxUrlId += 1 submap[newurl] = maxUrlId subIdx = maxUrlId entry = list(entry) entry[6] = subIdx entry[7] = sub_file fp.write(json.dumps(entry, separators=(',', ':')) + ',\n') fp.seek(max(fp.tell(), 3) - 2) fp.write(']') print('\r', end='') print(f'write ipa.json: {total} entries') for newurl, newidx in submap.items(): url_map[newidx] = newurl with open(CACHE_DIR / 'urls.json', 'w') as fp: fp.write(json.dumps(url_map, separators=(',\n', ':'))) print(f'write urls.json: {len(url_map)} entries') def export_filesize(): ignored = 0 written = 0 for i, (uid, fsize) in enumerate(CacheDB().enumFilesize()): size_path = diskPath(uid, '.size') if not size_path.exists(): with open(size_path, 'w') as fp: fp.write(str(fsize)) written += 1 else: ignored += 1 if i % 113 == 0: print(f'\r{written} files written. {ignored} ignored', end='') print(f'\r{written} files written. {ignored} ignored. done.') ############################################### # Helper ############################################### def diskPath(uid: int, ext: str) -> Path: return CACHE_DIR / str(uid // 1000) / f'{uid}{ext}' def printProgress(blocknum, bs, size): if size == 0: return percent = (blocknum * bs) / size done = "#" * int(40 * percent) print(f'\r[{done:<40}] {percent:.1%}', end='') # def b64e(text: str) -> str: # return b64encode(text.encode('utf8')).decode('ascii') if __name__ == '__main__': main()