From 78ee345a2e0f54f4c7d2207f4f1baaa15e29e718 Mon Sep 17 00:00:00 2001 From: relikd Date: Tue, 21 Feb 2023 14:10:24 +0100 Subject: [PATCH] feat: add types --- ABCDDB.py | 509 ++++++++++++++++++++++++++----------------------- vcard2image.py | 5 +- 2 files changed, 277 insertions(+), 237 deletions(-) diff --git a/ABCDDB.py b/ABCDDB.py index deed9cc..df6c88b 100755 --- a/ABCDDB.py +++ b/ABCDDB.py @@ -3,59 +3,36 @@ import sys import sqlite3 from base64 import b64encode from urllib.parse import quote +from typing import List, Dict, Any, Iterable, Optional PROD_ID = '-//Apple Inc.//Mac OS X 10.15.7//EN' ITEM_COUNTER = 0 -class ABCDDB(object): - @staticmethod - def load(db_path): - db = sqlite3.connect(db_path) - cur = db.cursor() +# =============================== +# Helper methods +# =============================== - records = Record.queryAll(cur) - - # query once, then distribute - for x in Email.queryAll(cur): - records[x.parent].email.append(x) - - for x in Phone.queryAll(cur): - records[x.parent].phone.append(x) - - for x in Address.queryAll(cur): - records[x.parent].address.append(x) - - for x in SocialProfile.queryAll(cur): - records[x.parent].socialprofile.append(x) - - for x in Note.queryAll(cur): - records[x.parent].note = x.text - - for x in URL.queryAll(cur): - records[x.parent].urls.append(x) - - for x in Service.queryAll(cur): - records[x.parent].service.append(x) - - db.close() - return records.values() - - -def incrItem(value, label): +def incrItem(value: str, label: str) -> str: global ITEM_COUNTER ITEM_COUNTER += 1 return 'item{0}.{1}\r\nitem{0}.X-ABLabel:{2}'.format( ITEM_COUNTER, value, label) -def x520(val): +def x520(val: str) -> Optional[str]: if not val: return None return val.replace(';', '\\;').replace(',', '\\,') -def buildLabel(prefix, label, isFirst, suffix, validOther=False): +def buildLabel( + prefix: str, + label: str, + isFirst: bool, + suffix: str, + validOther: bool = False, +) -> str: typ = '' if label == '_$!!$_': typ = ';type=HOME' @@ -71,9 +48,209 @@ def buildLabel(prefix, label, isFirst, suffix, validOther=False): return incrItem(value, label) -class Record(object): +# =============================== +# VCARD Attributes +# =============================== + +class Queryable: # Protocol @staticmethod - def queryAll(cursor): + def queryAll(cursor: sqlite3.Cursor) -> Iterable['Queryable']: + raise NotImplementedError() + + def __init__(self, row: List[Any]): + self._parent = -1 + raise NotImplementedError() + + @property + def parent(self) -> int: + return self._parent + + def asStr(self, markPref: bool) -> str: + raise NotImplementedError() + + +class Email(Queryable): + @staticmethod + def queryAll(cursor: sqlite3.Cursor) -> Iterable['Email']: + return (Email(x) for x in cursor.execute(''' + SELECT ZOWNER, ZLABEL, ZADDRESS + FROM ZABCDEMAILADDRESS + ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) + + def __init__(self, row: List[Any]): + self._parent = row[0] # type: int + self.label = x520(row[1]) or '' # type: str + self.email = x520(row[2]) or '' # type: str + + def asStr(self, markPref: bool) -> str: + return buildLabel('EMAIL;type=INTERNET', self.label, markPref, + self.email) + + +class Phone(Queryable): + @staticmethod + def queryAll(cursor: sqlite3.Cursor) -> Iterable['Phone']: + return (Phone(x) for x in cursor.execute(''' + SELECT ZOWNER, ZLABEL, ZFULLNUMBER + FROM ZABCDPHONENUMBER + ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) + + def __init__(self, row: List[Any]): + self._parent = row[0] # type: int + self.label = x520(row[1]) or '' # type: str + self.number = x520(row[2]) or '' # type: str + + def asStr(self, markPref: bool) -> str: + mapping = { + '_$!!$_': ';type=CELL;type=VOICE', + 'iPhone': ';type=IPHONE;type=CELL;type=VOICE', + '_$!!$_': ';type=HOME;type=VOICE', + '_$!!$_': ';type=WORK;type=VOICE', + '_$!
!$_': ';type=MAIN', + '_$!!$_': ';type=HOME;type=FAX', + '_$!!$_': ';type=WORK;type=FAX', + '_$!!$_': ';type=OTHER;type=FAX', + '_$!!$_': ';type=PAGER', + '_$!!$_': ';type=OTHER;type=VOICE' + } + value = (';type=pref:' if markPref else ':') + self.number + if self.label in mapping.keys(): + return 'TEL' + mapping[self.label] + value + else: + return incrItem('TEL' + value, self.label) + + +class Address(Queryable): + @staticmethod + def queryAll(cursor: sqlite3.Cursor) -> Iterable['Address']: + return (Address(x) for x in cursor.execute(''' + SELECT ZOWNER, ZLABEL, + ZSTREET, ZCITY, ZSTATE, ZZIPCODE, ZCOUNTRYNAME + FROM ZABCDPOSTALADDRESS + ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) + + def __init__(self, row: List[Any]): + self._parent = row[0] # type: int + self.label = x520(row[1]) or '' # type: str + self.street = x520(row[2]) or '' # type: str + self.city = x520(row[3]) or '' # type: str + self.state = x520(row[4]) or '' # type: str + self.zip = x520(row[5]) or '' # type: str + self.country = x520(row[6]) or '' # type: str + + def asStr(self, markPref: bool) -> str: + value = ';'.join((self.street, self.city, self.state, self.zip, + self.country)) + return buildLabel('ADR', self.label, markPref, ';;' + value, + validOther=True) + + +class SocialProfile(Queryable): + @staticmethod + def queryAll(cursor: sqlite3.Cursor) -> Iterable['SocialProfile']: + return (SocialProfile(x) for x in cursor.execute(''' + SELECT ZOWNER, ZSERVICENAME, ZUSERNAME + FROM ZABCDSOCIALPROFILE;''')) + + def __init__(self, row: List[Any]): + self._parent = row[0] # type: int + self.service = row[1] or '' # type: str + # no x520(); actually, Apple does that ... and it breaks on reimport + self.user = row[2] or '' # type: str + + def asStr(self, markPref: bool) -> str: + # Apple does some x-user, x-apple, and url stuff that is wrong + return 'X-SOCIALPROFILE;type=' + self.service.lower() + ':' + self.user + + +class Note(Queryable): + @staticmethod + def queryAll(cursor: sqlite3.Cursor) -> Iterable['Note']: + return (Note(x) for x in cursor.execute(''' + SELECT ZCONTACT, ZTEXT + FROM ZABCDNOTE + WHERE ZTEXT IS NOT NULL;''')) + + def __init__(self, row: List[Any]): + self._parent = row[0] # type: int + self.text = x520(row[1]) or '' # type: str + + def asStr(self, markPref: bool) -> str: + return self.text + + +class URL(Queryable): + @staticmethod + def queryAll(cursor: sqlite3.Cursor) -> Iterable['URL']: + return (URL(x) for x in cursor.execute(''' + SELECT ZOWNER, ZLABEL, ZURL + FROM ZABCDURLADDRESS + ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) + + def __init__(self, row: List[Any]): + self._parent = row[0] # type: int + self.label = x520(row[1]) or '' # type: str + self.url = x520(row[2]) or '' # type: str + + def asStr(self, markPref: bool) -> str: + return buildLabel('URL', self.label, markPref, self.url) + + +class Service(Queryable): + @staticmethod + def queryAll(cursor: sqlite3.Cursor) -> Iterable['Service']: + return (Service(x) for x in cursor.execute(''' + SELECT ZOWNER, ZSERVICENAME, ZLABEL, ZADDRESS + FROM ZABCDMESSAGINGADDRESS + INNER JOIN ZABCDSERVICE ON ZSERVICE = ZABCDSERVICE.Z_PK + ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) + + def __init__(self, row: List[Any]): + self._parent = row[0] # type: int + self.service = row[1] or '' # type: str + if self.service.endswith('Instant'): + self.service = self.service[:-7] # drop suffix + self.label = x520(row[2]) or '' # type: str + self.username = x520(row[3]) or '' # type: str + + def isSpecial(self) -> bool: + return self.service in ['Jabber', 'MSN', 'Yahoo', 'ICQ'] + + def asSpecialStr(self, markPref: bool) -> str: + return buildLabel('X-' + self.service.upper(), self.label, markPref, + self.username) + + def asStr(self, markPref: bool) -> str: + if self.service in ['Jabber', 'GoogleTalk', 'Facebook']: + typ = 'xmpp' + elif self.service in ['GaduGadu', 'QQ']: + typ = 'x-apple' + elif self.service == 'ICQ': + typ = 'aim' + elif self.service == 'MSN': + typ = 'msnim' + elif self.service == 'Skype': + typ = 'skype' + elif self.service == 'Yahoo': + typ = 'ymsgr' + else: + raise NotImplementedError('Unkown Service: ' + self.service) + + # Dear Apple, why do you do such weird shit, URL encoding? bah! + # Even worse, you break it so that reimport fails. + # user = quote(self.username, safe='!/()=_:.\'$&').replace('%2C', '\\,') + user = self.username + return buildLabel('IMPP;X-SERVICE-TYPE=' + self.service, self.label, + markPref, typ + ':' + user) + + +# =============================== +# VCARD main +# =============================== + +class Record: + @staticmethod + def queryAll(cursor: sqlite3.Cursor) -> Dict[int, 'Record']: return {x[0]: Record(x) for x in cursor.execute(''' SELECT Z_PK, ZFIRSTNAME, ZLASTNAME, ZMIDDLENAME, ZTITLE, ZSUFFIX, @@ -85,48 +262,48 @@ class Record(object): FROM ZABCDRECORD WHERE ZCONTAINER1 IS NOT NULL;''')} - def __init__(self, row): - self.id = row[0] - self.firstname = x520(row[1]) or '' - self.lastname = x520(row[2]) or '' - self.middlename = x520(row[3]) or '' - self.nameprefix = x520(row[4]) or '' - self.namesuffix = x520(row[5]) or '' - self.nickname = x520(row[6]) - self.maidenname = x520(row[7]) - self.phonetic_firstname = x520(row[8]) - self.phonetic_middlename = x520(row[9]) - self.phonetic_lastname = x520(row[10]) - self.phonetic_org = x520(row[11]) - self.organization = x520(row[12]) or '' - self.department = x520(row[13]) or '' - self.jobtitle = x520(row[14]) - self.bday = row[15] - self.email = [] - self.phone = [] - self.address = [] - self.socialprofile = [] - self.note = None - self.urls = [] - self.service = [] - self.image = row[16] - self.iscompany = row[17] & 1 + def __init__(self, row: List[Any]) -> None: + self.id = row[0] # type: int + self.firstname = x520(row[1]) or '' # type: str + self.lastname = x520(row[2]) or '' # type: str + self.middlename = x520(row[3]) or '' # type: str + self.nameprefix = x520(row[4]) or '' # type: str + self.namesuffix = x520(row[5]) or '' # type: str + self.nickname = x520(row[6]) # type: Optional[str] + self.maidenname = x520(row[7]) # type: Optional[str] + self.phonetic_firstname = x520(row[8]) # type: Optional[str] + self.phonetic_middlename = x520(row[9]) # type: Optional[str] + self.phonetic_lastname = x520(row[10]) # type: Optional[str] + self.phonetic_org = x520(row[11]) # type: Optional[str] + self.organization = x520(row[12]) or '' # type: str + self.department = x520(row[13]) or '' # type: str + self.jobtitle = x520(row[14]) # type: Optional[str] + self.bday = row[15] # type: Optional[str] + self.email = [] # type: List[Email] + self.phone = [] # type: List[Phone] + self.address = [] # type: List[Address] + self.socialprofile = [] # type: List[SocialProfile] + self.note = None # type: Optional[str] + self.urls = [] # type: List[URL] + self.service = [] # type: List[Service] + self.image = row[16] # type: Optional[bytes] + self.iscompany = row[17] & 1 # type: bool - def __repr__(self): + def __repr__(self) -> str: return self.makeVCard() - def makeVCard(self): + def makeVCard(self) -> str: global ITEM_COUNTER ITEM_COUNTER = 0 t = 'BEGIN:VCARD\r\nVERSION:3.0' t += '\r\nPRODID:' + PROD_ID - def optional(key, value): + def optional(key: str, value: Optional[str]) -> None: nonlocal t if value: t += '\r\n' + key + ':' + value - def optionalArray(arr): + def optionalArray(arr: Iterable[Queryable]) -> None: nonlocal t isFirst = True for x in arr: @@ -178,20 +355,18 @@ class Record(object): if self.image: try: - t += '\r\n' + self.imageAsBase64() + t += '\r\n' + self.imageAsBase64(self.image) except NotImplementedError: print('''Image format not supported. Could not extract image for contact: {} - @: {}... + @: {!r}... skipping.'''.format(fullname, self.image[:20]), file=sys.stderr) if self.iscompany: t += '\r\nX-ABShowAs:COMPANY' return t + '\r\nEND:VCARD\r\n' - def imageAsBase64(self): - if not self.image: - return - img = self.image[1:] # why does Apple prepend \x01 to all images?! + def imageAsBase64(self, image: bytes) -> str: + img = image[1:] # why does Apple prepend \x01 to all images?! t = 'PHOTO;ENCODING=b;TYPE=' if img[6:10] == b'JFIF': t += 'JPEG:' + b64encode(img).decode('ascii') @@ -202,173 +377,39 @@ class Record(object): return t[0] + '\r\n '.join(t[i:i + 74] for i in range(1, len(t), 74)) -class Email(object): +# =============================== +# Main Entry +# =============================== + +class ABCDDB: @staticmethod - def queryAll(cursor): - return (Email(x) for x in cursor.execute(''' - SELECT ZOWNER, ZLABEL, ZADDRESS - FROM ZABCDEMAILADDRESS - ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) + def load(db_path: str) -> Iterable['Record']: + db = sqlite3.connect(db_path) + cur = db.cursor() - def __init__(self, row): - self.parent = row[0] - self.label = x520(row[1]) - self.email = x520(row[2]) + records = Record.queryAll(cur) - def asStr(self, markPref): - return buildLabel('EMAIL;type=INTERNET', self.label, markPref, - self.email) + # query once, then distribute + for email in Email.queryAll(cur): + records[email.parent].email.append(email) + for phone in Phone.queryAll(cur): + records[phone.parent].phone.append(phone) -class Phone(object): - @staticmethod - def queryAll(cursor): - return (Phone(x) for x in cursor.execute(''' - SELECT ZOWNER, ZLABEL, ZFULLNUMBER - FROM ZABCDPHONENUMBER - ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) + for address in Address.queryAll(cur): + records[address.parent].address.append(address) - def __init__(self, row): - self.parent = row[0] - self.label = x520(row[1]) - self.number = x520(row[2]) + for social in SocialProfile.queryAll(cur): + records[social.parent].socialprofile.append(social) - def asStr(self, markPref): - mapping = { - '_$!!$_': ';type=CELL;type=VOICE', - 'iPhone': ';type=IPHONE;type=CELL;type=VOICE', - '_$!!$_': ';type=HOME;type=VOICE', - '_$!!$_': ';type=WORK;type=VOICE', - '_$!
!$_': ';type=MAIN', - '_$!!$_': ';type=HOME;type=FAX', - '_$!!$_': ';type=WORK;type=FAX', - '_$!!$_': ';type=OTHER;type=FAX', - '_$!!$_': ';type=PAGER', - '_$!!$_': ';type=OTHER;type=VOICE' - } - value = (';type=pref:' if markPref else ':') + self.number - if self.label in mapping.keys(): - return 'TEL' + mapping[self.label] + value - else: - return incrItem('TEL' + value, self.label) + for note in Note.queryAll(cur): + records[note.parent].note = note.text + for url in URL.queryAll(cur): + records[url.parent].urls.append(url) -class Address(object): - @staticmethod - def queryAll(cursor): - return (Address(x) for x in cursor.execute(''' - SELECT ZOWNER, ZLABEL, - ZSTREET, ZCITY, ZSTATE, ZZIPCODE, ZCOUNTRYNAME - FROM ZABCDPOSTALADDRESS - ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) + for service in Service.queryAll(cur): + records[service.parent].service.append(service) - def __init__(self, row): - self.parent = row[0] - self.label = x520(row[1]) - self.street = x520(row[2]) or '' - self.city = x520(row[3]) or '' - self.state = x520(row[4]) or '' - self.zip = x520(row[5]) or '' - self.country = x520(row[6]) or '' - - def asStr(self, markPref): - value = ';'.join((self.street, self.city, self.state, self.zip, - self.country)) - return buildLabel('ADR', self.label, markPref, ';;' + value, - validOther=True) - - -class SocialProfile(object): - @staticmethod - def queryAll(cursor): - return (SocialProfile(x) for x in cursor.execute(''' - SELECT ZOWNER, ZSERVICENAME, ZUSERNAME - FROM ZABCDSOCIALPROFILE;''')) - - def __init__(self, row): - self.parent = row[0] - self.service = row[1] - # no x520(); actually, Apple does that ... and it breaks on reimport - self.user = row[2] # Apple: x520() - - def asStr(self, markPref): - # Apple does some x-user, x-apple, and url stuff that is wrong - return 'X-SOCIALPROFILE;type=' + self.service.lower() + ':' + self.user - - -class Note(object): - @staticmethod - def queryAll(cursor): - return (Note(x) for x in cursor.execute(''' - SELECT ZCONTACT, ZTEXT - FROM ZABCDNOTE - WHERE ZTEXT IS NOT NULL;''')) - - def __init__(self, row): - self.parent = row[0] - self.text = x520(row[1]) - - -class URL(object): - @staticmethod - def queryAll(cursor): - return (URL(x) for x in cursor.execute(''' - SELECT ZOWNER, ZLABEL, ZURL - FROM ZABCDURLADDRESS - ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) - - def __init__(self, row): - self.parent = row[0] - self.label = x520(row[1]) - self.url = x520(row[2]) - - def asStr(self, markPref): - return buildLabel('URL', self.label, markPref, self.url) - - -class Service(object): - @staticmethod - def queryAll(cursor): - return (Service(x) for x in cursor.execute(''' - SELECT ZOWNER, ZSERVICENAME, ZLABEL, ZADDRESS - FROM ZABCDMESSAGINGADDRESS - INNER JOIN ZABCDSERVICE ON ZSERVICE = ZABCDSERVICE.Z_PK - ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) - - def __init__(self, row): - self.parent = row[0] - self.service = row[1] - if self.service.endswith('Instant'): - self.service = self.service[:-7] # drop suffix - self.label = x520(row[2]) - self.username = x520(row[3]) - - def isSpecial(self): - return self.service in ['Jabber', 'MSN', 'Yahoo', 'ICQ'] - - def asSpecialStr(self, markPref): - return buildLabel('X-' + self.service.upper(), self.label, markPref, - self.username) - - def asStr(self, markPref): - if self.service in ['Jabber', 'GoogleTalk', 'Facebook']: - typ = 'xmpp' - elif self.service in ['GaduGadu', 'QQ']: - typ = 'x-apple' - elif self.service == 'ICQ': - typ = 'aim' - elif self.service == 'MSN': - typ = 'msnim' - elif self.service == 'Skype': - typ = 'skype' - elif self.service == 'Yahoo': - typ = 'ymsgr' - else: - raise NotImplementedError('Unkown Service: ' + self.service) - - # Dear Apple, why do you do such weird shit, URL encoding? bah! - # Even worse, you break it so that reimport fails. - # user = quote(self.username, safe='!/()=_:.\'$&').replace('%2C', '\\,') - user = self.username - return buildLabel('IMPP;X-SERVICE-TYPE=' + self.service, self.label, - markPref, typ + ':' + user) + db.close() + return records.values() diff --git a/vcard2image.py b/vcard2image.py index 96c1542..79a8b1b 100755 --- a/vcard2image.py +++ b/vcard2image.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 import os -import re import sys from base64 import b64decode from pathlib import Path @@ -25,14 +24,14 @@ with open(infile, 'r') as f: c1 = 0 c2 = 0 name = '' - img = None + img = '' collect = False for line in f.readlines(): line = line.rstrip() if line == 'BEGIN:VCARD': c1 += 1 name = '' - img = None + img = '' collect = False elif line.startswith('FN:'): name = line.split(':', 1)[1]