feat: multi-file export + fix continue on error

This commit is contained in:
relikd
2024-01-27 16:19:06 +01:00
parent 43d979f630
commit 2ab5609483
2 changed files with 66 additions and 15 deletions

View File

@@ -1,4 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import re
import sys import sys
import sqlite3 import sqlite3
from base64 import b64encode from base64 import b64encode
@@ -8,6 +9,7 @@ from typing import List, Dict, Any, Iterable, Optional
ITEM_COUNTER = 0 ITEM_COUNTER = 0
rx_query = re.compile(r'SELECT([\s\S]*)FROM[\s]+([A-Z_]+)') rx_query = re.compile(r'SELECT([\s\S]*)FROM[\s]+([A-Z_]+)')
rx_cols = re.compile(r'[\s,;](Z[A-Z_]+)') rx_cols = re.compile(r'[\s,;](Z[A-Z_]+)')
rx_tags = re.compile(r'\%\{[A-Za-z_]+?\}')
# =============================== # ===============================
# Helper methods # Helper methods
@@ -337,26 +339,35 @@ class Record:
self.image = row[16] # type: Optional[bytes] self.image = row[16] # type: Optional[bytes]
display_flags = row[17] or 0 # type: int display_flags = row[17] or 0 # type: int
self.iscompany = bool(display_flags & 1) # type: bool self.iscompany = bool(display_flags & 1) # type: bool
self.fullname = self.organization if self.iscompany else ' '.join(
filter(None, [self.nameprefix, self.firstname, self.middlename,
self.lastname, self.namesuffix]))
def __repr__(self) -> str: def __repr__(self) -> str:
return self.makeVCard() return self.makeVCard()
def formatFilename(self, format: str) -> str:
matches = rx_tags.findall(format)
for tag in matches:
value = getattr(self, tag[2:-1])
if isinstance(value, list):
value = value[0] if len(value) else None
if isinstance(value, Queryable):
value = value.asPrintable()
format = format.replace(tag, str(value or '').replace('/', ':'))
return format
def makeVCard(self) -> str: def makeVCard(self) -> str:
global ITEM_COUNTER global ITEM_COUNTER
ITEM_COUNTER = 0 ITEM_COUNTER = 0
name = ';'.join((self.lastname, self.firstname, self.middlename,
self.nameprefix, self.namesuffix))
fullname = self.organization if self.iscompany else ' '.join(
filter(None, [self.nameprefix, self.firstname, self.middlename,
self.lastname, self.namesuffix]))
# rquired fields: BEGIN, END, VERSION, N, FN # rquired fields: BEGIN, END, VERSION, N, FN
data = [ data = [
'BEGIN:VCARD', 'BEGIN:VCARD',
'VERSION:3.0', 'VERSION:3.0',
'N:' + name, 'N:' + ';'.join((self.lastname, self.firstname, self.middlename,
'FN:' + fullname, self.nameprefix, self.namesuffix)),
'FN:' + self.fullname,
] ]
def optional(key: str, value: Optional[str]) -> None: def optional(key: str, value: Optional[str]) -> None:
@@ -406,7 +417,7 @@ class Record:
print('''Image format not supported. print('''Image format not supported.
Could not extract image for contact: {} Could not extract image for contact: {}
@: {!r}... @: {!r}...
skipping.'''.format(fullname, self.image[:20]), file=sys.stderr) skipping.'''.format(self.fullname, self.image[:20]), file=sys.stderr)
if self.iscompany: if self.iscompany:
data.append('X-ABShowAs:COMPANY') data.append('X-ABShowAs:COMPANY')
data.append('END:VCARD') data.append('END:VCARD')

View File

@@ -4,12 +4,13 @@ Extract data from AddressBook database (.abcddb) to Contacts VCards file (.vcf)
''' '''
import os import os
import sys import sys
from io import TextIOWrapper
from pathlib import Path from pathlib import Path
from argparse import ArgumentParser from argparse import ArgumentParser
try: try:
from .ABCDDB import ABCDDB from .ABCDDB import ABCDDB, Record
except ImportError: # fallback if not run as module except ImportError: # fallback if not run as module
from ABCDDB import ABCDDB # type: ignore[import, no-redef] from ABCDDB import ABCDDB, Record # type: ignore[import, no-redef]
DB_FILE = str(Path.home().joinpath( DB_FILE = str(Path.home().joinpath(
'Library', 'Application Support', 'AddressBook', 'AddressBook-v22.abcddb')) 'Library', 'Application Support', 'AddressBook', 'AddressBook-v22.abcddb'))
@@ -21,9 +22,16 @@ def main() -> None:
help='VCard output file.') help='VCard output file.')
cli.add_argument('-f', '--force', action='store_true', cli.add_argument('-f', '--force', action='store_true',
help='Overwrite existing output file.') help='Overwrite existing output file.')
cli.add_argument('--dry-run', action='store_true',
help='Do not write file(s), just print filenames.')
cli.add_argument('-i', '--input', type=str, metavar='AddressBook.abcddb', cli.add_argument('-i', '--input', type=str, metavar='AddressBook.abcddb',
default=DB_FILE, help='Specify another abcddb input file.' default=DB_FILE, help='Specify another abcddb input file.'
' Default: ' + DB_FILE) ' Default: ' + DB_FILE)
cli.add_argument('-s', '--split', type=str, metavar='FORMAT', help='''
Output into several vcf files instead of a single file.
File format can use any field of type Record.
E.g. "%%{id}_%%{fullname}.vcf".
''')
args = cli.parse_args() args = cli.parse_args()
# check input args # check input args
@@ -34,17 +42,49 @@ def main() -> None:
elif not os.path.isdir(os.path.dirname(args.output) or os.curdir): elif not os.path.isdir(os.path.dirname(args.output) or os.curdir):
print('Output parent directory does not exist.', file=sys.stderr) print('Output parent directory does not exist.', file=sys.stderr)
exit(1) exit(1)
elif os.path.isfile(args.output) and not args.force: elif os.path.exists(args.output) and not args.force:
print('Output file already exist. Use -f to force overwrite.', print('Output file already exist. Use -f to force overwrite.',
file=sys.stderr) file=sys.stderr)
exit(1) exit(1)
# perform export # perform export
contacts = ABCDDB.load(args.input) contacts = ABCDDB.load(args.input)
with open(args.output, 'w') as f: export_count = 0
for rec in contacts:
# reused for appending to an open file
def writeRec(f: TextIOWrapper, rec: Record):
nonlocal export_count
try:
f.write(rec.makeVCard()) f.write(rec.makeVCard())
print(len(contacts), 'contacts.') export_count += 1
except Exception as e:
print(f'Error processing contact {rec.id} {rec.fullname}: {e}',
file=sys.stderr)
# choose which export mode to use
if args.split: # multi-file mode
outDir = Path(args.output)
prevFilenames = set()
for rec in contacts:
filename = outDir / Path(rec.formatFilename(args.split))
if filename in prevFilenames:
print(f'WARN: overwriting "{filename}"', file=sys.stderr)
prevFilenames.add(filename)
os.makedirs(filename.parent, exist_ok=True)
if args.dry_run:
print(filename)
else:
with open(filename, 'w') as f:
writeRec(f, rec)
else: # single-file mode
if args.dry_run:
print(args.output)
else:
with open(args.output, 'w') as f:
for rec in contacts:
writeRec(f, rec)
print(f'{export_count}/{len(contacts)} contacts.')
if __name__ == '__main__': if __name__ == '__main__':