4 Commits

Author SHA1 Message Date
relikd
2604eed96e chore: version bump 2024-01-27 16:22:49 +01:00
relikd
2ab5609483 feat: multi-file export + fix continue on error 2024-01-27 16:19:06 +01:00
relikd
43d979f630 ref: asPrintable instead of __repr__ 2024-01-27 16:17:53 +01:00
relikd
f3ae2e3ff4 fix: ignore non-existent columns 2024-01-27 16:16:47 +01:00
4 changed files with 124 additions and 53 deletions

View File

@@ -8,15 +8,21 @@ The output of this script should be exactly the same as dragging and dropping th
### Usage ### Usage
``` ```sh
python3 abcddb2vcard.py backup/contacts_$(date +"%Y-%m-%d").vcf python3 abcddb2vcard.py backup/contacts_$(date +"%Y-%m-%d").vcf
``` ```
> assuming db is located at "~/Library/Application Support/AddressBook/AddressBook-v22.abcddb" > assuming db is located at "~/Library/Application Support/AddressBook/AddressBook-v22.abcddb"
#### Export into individual files
```sh
python3 abcddb2vcard.py outdir -s 'path/%{fullname}.vcf'
```
#### Extract contact images #### Extract contact images
``` ```sh
python3 vcard2image.py AllContacts.vcf ./profile_pics/ python3 vcard2image.py AllContacts.vcf ./profile_pics/
``` ```

View File

@@ -1,4 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import re
import sys import sys
import sqlite3 import sqlite3
from base64 import b64encode from base64 import b64encode
@@ -6,7 +7,9 @@ from urllib.parse import quote
from typing import List, Dict, Any, Iterable, Optional from typing import List, Dict, Any, Iterable, Optional
ITEM_COUNTER = 0 ITEM_COUNTER = 0
rx_query = re.compile(r'SELECT([\s\S]*)FROM[\s]+([A-Z_]+)')
rx_cols = re.compile(r'[\s,;](Z[A-Z_]+)')
rx_tags = re.compile(r'\%\{[A-Za-z_]+?\}')
# =============================== # ===============================
# Helper methods # Helper methods
@@ -47,6 +50,17 @@ def buildLabel(
return incrItem(value, label) return incrItem(value, label)
def sanitize(cursor: sqlite3.Cursor, query: str) -> str:
cols, table = rx_query.findall(query)[0]
sel_cols = {x for x in rx_cols.findall(cols)}
all_cols = {x[1] for x in cursor.execute(f'PRAGMA table_info({table});')}
missing_cols = sel_cols.difference(all_cols)
for missing in missing_cols:
print(f'WARN: column "{missing}" not found in {table}. Ignoring.',
file=sys.stderr)
query = query.replace(missing, 'NULL')
return query
# =============================== # ===============================
# VCARD Attributes # VCARD Attributes
# =============================== # ===============================
@@ -64,8 +78,11 @@ class Queryable: # Protocol
def parent(self) -> int: def parent(self) -> int:
return self._parent return self._parent
def classStr(self, value: str) -> str: def __repr__(self) -> str:
return '<{} "{}">'.format(self.__class__.__name__, value) return '<{} "{}">'.format(self.__class__.__name__, self.asPrintable())
def asPrintable(self) -> str:
return '?'
def asVCard(self, markPref: bool) -> str: def asVCard(self, markPref: bool) -> str:
raise NotImplementedError() raise NotImplementedError()
@@ -74,18 +91,18 @@ class Queryable: # Protocol
class Email(Queryable): class Email(Queryable):
@staticmethod @staticmethod
def queryAll(cursor: sqlite3.Cursor) -> Iterable['Email']: def queryAll(cursor: sqlite3.Cursor) -> Iterable['Email']:
return (Email(x) for x in cursor.execute(''' return (Email(x) for x in cursor.execute(sanitize(cursor, '''
SELECT ZOWNER, ZLABEL, ZADDRESS SELECT ZOWNER, ZLABEL, ZADDRESS
FROM ZABCDEMAILADDRESS FROM ZABCDEMAILADDRESS
ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')))
def __init__(self, row: List[Any]): def __init__(self, row: List[Any]):
self._parent = row[0] # type: int self._parent = row[0] # type: int
self.label = x520(row[1]) or '' # type: str self.label = x520(row[1]) or '' # type: str
self.email = x520(row[2]) or '' # type: str self.email = x520(row[2]) or '' # type: str
def __repr__(self) -> str: def asPrintable(self) -> str:
return self.classStr(self.email) return self.email
def asVCard(self, markPref: bool) -> str: def asVCard(self, markPref: bool) -> str:
return buildLabel( return buildLabel(
@@ -95,18 +112,18 @@ class Email(Queryable):
class Phone(Queryable): class Phone(Queryable):
@staticmethod @staticmethod
def queryAll(cursor: sqlite3.Cursor) -> Iterable['Phone']: def queryAll(cursor: sqlite3.Cursor) -> Iterable['Phone']:
return (Phone(x) for x in cursor.execute(''' return (Phone(x) for x in cursor.execute(sanitize(cursor, '''
SELECT ZOWNER, ZLABEL, ZFULLNUMBER SELECT ZOWNER, ZLABEL, ZFULLNUMBER
FROM ZABCDPHONENUMBER FROM ZABCDPHONENUMBER
ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')))
def __init__(self, row: List[Any]): def __init__(self, row: List[Any]):
self._parent = row[0] # type: int self._parent = row[0] # type: int
self.label = x520(row[1]) or '' # type: str self.label = x520(row[1]) or '' # type: str
self.number = x520(row[2]) or '' # type: str self.number = x520(row[2]) or '' # type: str
def __repr__(self) -> str: def asPrintable(self) -> str:
return self.classStr(self.number) return self.number
def asVCard(self, markPref: bool) -> str: def asVCard(self, markPref: bool) -> str:
mapping = { mapping = {
@@ -131,11 +148,11 @@ class Phone(Queryable):
class Address(Queryable): class Address(Queryable):
@staticmethod @staticmethod
def queryAll(cursor: sqlite3.Cursor) -> Iterable['Address']: def queryAll(cursor: sqlite3.Cursor) -> Iterable['Address']:
return (Address(x) for x in cursor.execute(''' return (Address(x) for x in cursor.execute(sanitize(cursor, '''
SELECT ZOWNER, ZLABEL, SELECT ZOWNER, ZLABEL,
ZSTREET, ZCITY, ZSTATE, ZZIPCODE, ZCOUNTRYNAME ZSTREET, ZCITY, ZSTATE, ZZIPCODE, ZCOUNTRYNAME
FROM ZABCDPOSTALADDRESS FROM ZABCDPOSTALADDRESS
ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')))
def __init__(self, row: List[Any]): def __init__(self, row: List[Any]):
self._parent = row[0] # type: int self._parent = row[0] # type: int
@@ -146,9 +163,9 @@ class Address(Queryable):
self.zip = x520(row[5]) or '' # type: str self.zip = x520(row[5]) or '' # type: str
self.country = x520(row[6]) or '' # type: str self.country = x520(row[6]) or '' # type: str
def __repr__(self) -> str: def asPrintable(self) -> str:
return self.classStr(', '.join(filter(None, ( return ', '.join(filter(None, (
self.street, self.city, self.state, self.zip, self.country)))) self.street, self.city, self.state, self.zip, self.country)))
def asVCard(self, markPref: bool) -> str: def asVCard(self, markPref: bool) -> str:
value = ';'.join(( value = ';'.join((
@@ -160,9 +177,9 @@ class Address(Queryable):
class SocialProfile(Queryable): class SocialProfile(Queryable):
@staticmethod @staticmethod
def queryAll(cursor: sqlite3.Cursor) -> Iterable['SocialProfile']: def queryAll(cursor: sqlite3.Cursor) -> Iterable['SocialProfile']:
return (SocialProfile(x) for x in cursor.execute(''' return (SocialProfile(x) for x in cursor.execute(sanitize(cursor, '''
SELECT ZOWNER, ZSERVICENAME, ZUSERNAME SELECT ZOWNER, ZSERVICENAME, ZUSERNAME
FROM ZABCDSOCIALPROFILE;''')) FROM ZABCDSOCIALPROFILE;''')))
def __init__(self, row: List[Any]): def __init__(self, row: List[Any]):
self._parent = row[0] # type: int self._parent = row[0] # type: int
@@ -170,8 +187,8 @@ class SocialProfile(Queryable):
# no x520(); actually, Apple does that ... and it breaks on reimport # no x520(); actually, Apple does that ... and it breaks on reimport
self.user = row[2] or '' # type: str self.user = row[2] or '' # type: str
def __repr__(self) -> str: def asPrintable(self) -> str:
return self.classStr(self.service + ':' + self.user) return self.service + ':' + self.user
def asVCard(self, markPref: bool) -> str: def asVCard(self, markPref: bool) -> str:
# Apple does some x-user, x-apple, and url stuff that is wrong # Apple does some x-user, x-apple, and url stuff that is wrong
@@ -181,17 +198,17 @@ class SocialProfile(Queryable):
class Note(Queryable): class Note(Queryable):
@staticmethod @staticmethod
def queryAll(cursor: sqlite3.Cursor) -> Iterable['Note']: def queryAll(cursor: sqlite3.Cursor) -> Iterable['Note']:
return (Note(x) for x in cursor.execute(''' return (Note(x) for x in cursor.execute(sanitize(cursor, '''
SELECT ZCONTACT, ZTEXT SELECT ZCONTACT, ZTEXT
FROM ZABCDNOTE FROM ZABCDNOTE
WHERE ZTEXT IS NOT NULL;''')) WHERE ZTEXT IS NOT NULL;''')))
def __init__(self, row: List[Any]): def __init__(self, row: List[Any]):
self._parent = row[0] # type: int self._parent = row[0] # type: int
self.text = x520(row[1]) or '' # type: str self.text = x520(row[1]) or '' # type: str
def __repr__(self) -> str: def asPrintable(self) -> str:
return self.classStr(self.text) return self.text
def asVCard(self, markPref: bool) -> str: def asVCard(self, markPref: bool) -> str:
return self.text return self.text
@@ -200,18 +217,18 @@ class Note(Queryable):
class URL(Queryable): class URL(Queryable):
@staticmethod @staticmethod
def queryAll(cursor: sqlite3.Cursor) -> Iterable['URL']: def queryAll(cursor: sqlite3.Cursor) -> Iterable['URL']:
return (URL(x) for x in cursor.execute(''' return (URL(x) for x in cursor.execute(sanitize(cursor, '''
SELECT ZOWNER, ZLABEL, ZURL SELECT ZOWNER, ZLABEL, ZURL
FROM ZABCDURLADDRESS FROM ZABCDURLADDRESS
ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')))
def __init__(self, row: List[Any]): def __init__(self, row: List[Any]):
self._parent = row[0] # type: int self._parent = row[0] # type: int
self.label = x520(row[1]) or '' # type: str self.label = x520(row[1]) or '' # type: str
self.url = x520(row[2]) or '' # type: str self.url = x520(row[2]) or '' # type: str
def __repr__(self) -> str: def asPrintable(self) -> str:
return self.classStr(self.url) return self.url
def asVCard(self, markPref: bool) -> str: def asVCard(self, markPref: bool) -> str:
return buildLabel('URL', self.label, markPref, self.url) return buildLabel('URL', self.label, markPref, self.url)
@@ -220,11 +237,11 @@ class URL(Queryable):
class Service(Queryable): class Service(Queryable):
@staticmethod @staticmethod
def queryAll(cursor: sqlite3.Cursor) -> Iterable['Service']: def queryAll(cursor: sqlite3.Cursor) -> Iterable['Service']:
return (Service(x) for x in cursor.execute(''' return (Service(x) for x in cursor.execute(sanitize(cursor, '''
SELECT ZOWNER, ZSERVICENAME, ZLABEL, ZADDRESS SELECT ZOWNER, ZSERVICENAME, ZLABEL, ZADDRESS
FROM ZABCDMESSAGINGADDRESS FROM ZABCDMESSAGINGADDRESS
INNER JOIN ZABCDSERVICE ON ZSERVICE = ZABCDSERVICE.Z_PK INNER JOIN ZABCDSERVICE ON ZSERVICE = ZABCDSERVICE.Z_PK
ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')) ORDER BY ZOWNER, ZISPRIMARY DESC, ZORDERINGINDEX;''')))
def __init__(self, row: List[Any]): def __init__(self, row: List[Any]):
self._parent = row[0] # type: int self._parent = row[0] # type: int
@@ -234,9 +251,8 @@ class Service(Queryable):
self.label = x520(row[2]) or '' # type: str self.label = x520(row[2]) or '' # type: str
self.username = x520(row[3]) or '' # type: str self.username = x520(row[3]) or '' # type: str
def __repr__(self) -> str: def asPrintable(self) -> str:
return self.classStr(', '.join(( return ', '.join((self.service, self.label, self.username))
self.service, self.label, self.username)))
def isSpecial(self) -> bool: def isSpecial(self) -> bool:
return self.service in ['Jabber', 'MSN', 'Yahoo', 'ICQ'] return self.service in ['Jabber', 'MSN', 'Yahoo', 'ICQ']
@@ -281,7 +297,7 @@ class Record:
'SELECT Z_ENT FROM Z_PRIMARYKEY WHERE Z_NAME == "ABCDContact"' 'SELECT Z_ENT FROM Z_PRIMARYKEY WHERE Z_NAME == "ABCDContact"'
).fetchone()[0] ).fetchone()[0]
# find all records that match this id # find all records that match this id
return {x[0]: Record(x) for x in cursor.execute(''' return {x[0]: Record(x) for x in cursor.execute(sanitize(cursor, '''
SELECT Z_PK, SELECT Z_PK,
ZFIRSTNAME, ZLASTNAME, ZMIDDLENAME, ZTITLE, ZSUFFIX, ZFIRSTNAME, ZLASTNAME, ZMIDDLENAME, ZTITLE, ZSUFFIX,
ZNICKNAME, ZMAIDENNAME, ZNICKNAME, ZMAIDENNAME,
@@ -290,7 +306,7 @@ class Record:
strftime('%Y-%m-%d', ZBIRTHDAY + 978307200, 'unixepoch'), strftime('%Y-%m-%d', ZBIRTHDAY + 978307200, 'unixepoch'),
ZTHUMBNAILIMAGEDATA, ZDISPLAYFLAGS ZTHUMBNAILIMAGEDATA, ZDISPLAYFLAGS
FROM ZABCDRECORD FROM ZABCDRECORD
WHERE Z_ENT = ?;''', [z_ent])} WHERE Z_ENT = ?;'''), [z_ent])}
@staticmethod @staticmethod
def initEmpty(id: int) -> 'Record': def initEmpty(id: int) -> 'Record':
@@ -323,26 +339,35 @@ class Record:
self.image = row[16] # type: Optional[bytes] self.image = row[16] # type: Optional[bytes]
display_flags = row[17] or 0 # type: int display_flags = row[17] or 0 # type: int
self.iscompany = bool(display_flags & 1) # type: bool self.iscompany = bool(display_flags & 1) # type: bool
self.fullname = self.organization if self.iscompany else ' '.join(
filter(None, [self.nameprefix, self.firstname, self.middlename,
self.lastname, self.namesuffix]))
def __repr__(self) -> str: def __repr__(self) -> str:
return self.makeVCard() return self.makeVCard()
def formatFilename(self, format: str) -> str:
matches = rx_tags.findall(format)
for tag in matches:
value = getattr(self, tag[2:-1])
if isinstance(value, list):
value = value[0] if len(value) else None
if isinstance(value, Queryable):
value = value.asPrintable()
format = format.replace(tag, str(value or '').replace('/', ':'))
return format
def makeVCard(self) -> str: def makeVCard(self) -> str:
global ITEM_COUNTER global ITEM_COUNTER
ITEM_COUNTER = 0 ITEM_COUNTER = 0
name = ';'.join((self.lastname, self.firstname, self.middlename,
self.nameprefix, self.namesuffix))
fullname = self.organization if self.iscompany else ' '.join(
filter(None, [self.nameprefix, self.firstname, self.middlename,
self.lastname, self.namesuffix]))
# rquired fields: BEGIN, END, VERSION, N, FN # rquired fields: BEGIN, END, VERSION, N, FN
data = [ data = [
'BEGIN:VCARD', 'BEGIN:VCARD',
'VERSION:3.0', 'VERSION:3.0',
'N:' + name, 'N:' + ';'.join((self.lastname, self.firstname, self.middlename,
'FN:' + fullname, self.nameprefix, self.namesuffix)),
'FN:' + self.fullname,
] ]
def optional(key: str, value: Optional[str]) -> None: def optional(key: str, value: Optional[str]) -> None:
@@ -392,7 +417,7 @@ class Record:
print('''Image format not supported. print('''Image format not supported.
Could not extract image for contact: {} Could not extract image for contact: {}
@: {!r}... @: {!r}...
skipping.'''.format(fullname, self.image[:20]), file=sys.stderr) skipping.'''.format(self.fullname, self.image[:20]), file=sys.stderr)
if self.iscompany: if self.iscompany:
data.append('X-ABShowAs:COMPANY') data.append('X-ABShowAs:COMPANY')
data.append('END:VCARD') data.append('END:VCARD')

View File

@@ -2,6 +2,6 @@
''' '''
Convert AddressBook database (.abcddb) to Contacts VCards file (.vcf) Convert AddressBook database (.abcddb) to Contacts VCards file (.vcf)
''' '''
__version__ = '1.0.1' __version__ = '1.1.0'
from .ABCDDB import ABCDDB from .ABCDDB import ABCDDB

View File

@@ -4,12 +4,13 @@ Extract data from AddressBook database (.abcddb) to Contacts VCards file (.vcf)
''' '''
import os import os
import sys import sys
from io import TextIOWrapper
from pathlib import Path from pathlib import Path
from argparse import ArgumentParser from argparse import ArgumentParser
try: try:
from .ABCDDB import ABCDDB from .ABCDDB import ABCDDB, Record
except ImportError: # fallback if not run as module except ImportError: # fallback if not run as module
from ABCDDB import ABCDDB # type: ignore[import, no-redef] from ABCDDB import ABCDDB, Record # type: ignore[import, no-redef]
DB_FILE = str(Path.home().joinpath( DB_FILE = str(Path.home().joinpath(
'Library', 'Application Support', 'AddressBook', 'AddressBook-v22.abcddb')) 'Library', 'Application Support', 'AddressBook', 'AddressBook-v22.abcddb'))
@@ -21,9 +22,16 @@ def main() -> None:
help='VCard output file.') help='VCard output file.')
cli.add_argument('-f', '--force', action='store_true', cli.add_argument('-f', '--force', action='store_true',
help='Overwrite existing output file.') help='Overwrite existing output file.')
cli.add_argument('--dry-run', action='store_true',
help='Do not write file(s), just print filenames.')
cli.add_argument('-i', '--input', type=str, metavar='AddressBook.abcddb', cli.add_argument('-i', '--input', type=str, metavar='AddressBook.abcddb',
default=DB_FILE, help='Specify another abcddb input file.' default=DB_FILE, help='Specify another abcddb input file.'
' Default: ' + DB_FILE) ' Default: ' + DB_FILE)
cli.add_argument('-s', '--split', type=str, metavar='FORMAT', help='''
Output into several vcf files instead of a single file.
File format can use any field of type Record.
E.g. "%%{id}_%%{fullname}.vcf".
''')
args = cli.parse_args() args = cli.parse_args()
# check input args # check input args
@@ -34,17 +42,49 @@ def main() -> None:
elif not os.path.isdir(os.path.dirname(args.output) or os.curdir): elif not os.path.isdir(os.path.dirname(args.output) or os.curdir):
print('Output parent directory does not exist.', file=sys.stderr) print('Output parent directory does not exist.', file=sys.stderr)
exit(1) exit(1)
elif os.path.isfile(args.output) and not args.force: elif os.path.exists(args.output) and not args.force:
print('Output file already exist. Use -f to force overwrite.', print('Output file already exist. Use -f to force overwrite.',
file=sys.stderr) file=sys.stderr)
exit(1) exit(1)
# perform export # perform export
contacts = ABCDDB.load(args.input) contacts = ABCDDB.load(args.input)
with open(args.output, 'w') as f: export_count = 0
for rec in contacts:
# reused for appending to an open file
def writeRec(f: TextIOWrapper, rec: Record):
nonlocal export_count
try:
f.write(rec.makeVCard()) f.write(rec.makeVCard())
print(len(contacts), 'contacts.') export_count += 1
except Exception as e:
print(f'Error processing contact {rec.id} {rec.fullname}: {e}',
file=sys.stderr)
# choose which export mode to use
if args.split: # multi-file mode
outDir = Path(args.output)
prevFilenames = set()
for rec in contacts:
filename = outDir / Path(rec.formatFilename(args.split))
if filename in prevFilenames:
print(f'WARN: overwriting "{filename}"', file=sys.stderr)
prevFilenames.add(filename)
os.makedirs(filename.parent, exist_ok=True)
if args.dry_run:
print(filename)
else:
with open(filename, 'w') as f:
writeRec(f, rec)
else: # single-file mode
if args.dry_run:
print(args.output)
else:
with open(args.output, 'w') as f:
for rec in contacts:
writeRec(f, rec)
print(f'{export_count}/{len(contacts)} contacts.')
if __name__ == '__main__': if __name__ == '__main__':