typing + smaller bugfixes

This commit is contained in:
relikd
2022-04-09 03:45:48 +02:00
parent a25b62d934
commit d0c5072d27
15 changed files with 415 additions and 199 deletions

View File

@@ -1,6 +1,8 @@
#!/usr/bin/env python3
import os
from sys import stderr
from typing import Dict, Any, Optional, TextIO
from datetime import datetime # typing
from botlib.cli import Cli
from botlib.curl import Curl
@@ -8,7 +10,8 @@ from botlib.feed2list import Feed2List
from botlib.helper import StrFormat, FileWrite
def main():
def main() -> None:
''' CLI entry. '''
cli = Cli()
cli.arg_dir('dest_dir', help='Download all entries here')
cli.arg('source', help='RSS file or web-url')
@@ -25,10 +28,16 @@ def main():
print('ERROR: ' + str(e), file=stderr)
def process(source, dest_dir, *, by_year=False, dry_run=False):
def process(
source: str, # local file path or remote url
dest_dir: str,
*, by_year: bool = False,
dry_run: bool = False
) -> bool:
''' Parse a full podcast file / source. '''
# open source
if os.path.isfile(source):
fp = open(source) # closed in Feed2List
fp = open(source) # type: Optional[TextIO] # closed in Feed2List
elif Curl.valid_url(source):
fp = Curl.get(source) # closed in Feed2List
else:
@@ -41,7 +50,7 @@ def process(source, dest_dir, *, by_year=False, dry_run=False):
'pubDate', 'media:content', # image
# 'itunes:image', 'itunes:duration', 'itunes:summary'
])):
date = entry.get('pubDate') # try RSS only
date = entry['pubDate'] # try RSS only # type: datetime
if by_year:
dest = os.path.join(dest_dir, str(date.year))
if not dry_run and not os.path.exists(dest):
@@ -50,7 +59,13 @@ def process(source, dest_dir, *, by_year=False, dry_run=False):
return True
def process_entry(entry, date, dest_dir, *, dry_run=False):
def process_entry(
entry: Dict[str, Any],
date: datetime,
dest_dir: str,
*, dry_run: bool = False
) -> None:
''' Parse a single podcast media entry. '''
title = entry['title']
# <enclosure url="*.mp3" length="47216000" type="audio/mpeg"/>
audio_url = entry.get('enclosure', {}).get('url')
@@ -78,10 +93,11 @@ def process_entry(entry, date, dest_dir, *, dry_run=False):
@FileWrite.once(dest_dir, fname + '.txt', date, override=False,
dry_run=dry_run, verbose=True, intro=flag or intro)
def _description():
desc = title + '\n' + '=' * len(title)
desc += '\n\n' + StrFormat.strip_html(entry.get('description', ''))
return desc + '\n\n\n' + entry.get('link', '') + '\n'
def _description() -> str:
return '{}\n{}\n\n{}\n\n\n{}\n'.format(
title, '=' * len(title),
StrFormat.strip_html(entry.get('description', '')),
entry.get('link', ''))
if __name__ == '__main__':

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3
import os
from sys import stderr
from typing import Dict, Tuple, Optional, Any
from botlib.cli import Cli
from botlib.curl import Curl, URLError
@@ -15,7 +16,8 @@ db_slugs = OnceDB('radiolab_slugs.sqlite')
os.environ['TZ'] = 'America/New_York'
def main():
def main() -> None:
''' CLI entry. '''
cli = Cli()
cli.arg_dir('dest_dir', help='Download all episodes to dest_dir/year/')
cli.arg_bool('--dry-run', help='Do not download, just parse')
@@ -36,9 +38,17 @@ def main():
print('\nDone.\n\nNow check MP3 tags (consistency).')
def processEpisodeList(basedir, title, query, index=1, *, dry_run=False):
def processEpisodeList(
basedir: str,
title: str,
query: str,
index: int = 1,
*, dry_run: bool = False
) -> None:
''' Parse full podcast category. '''
print('\nProcessing: {}'.format(title), end='')
dat = Curl.json('{}/channel/shows/{}/{}?limit=9'.format(API, query, index))
url = '{}/channel/shows/{}/{}?limit=9'.format(API, query, index)
dat = Curl.json(url) # type: Dict[str, Any]
total = dat['data']['attributes']['total-pages']
print(' ({}/{})'.format(index, total))
anything_new = False
@@ -49,7 +59,12 @@ def processEpisodeList(basedir, title, query, index=1, *, dry_run=False):
processEpisodeList(basedir, title, query, index + 1, dry_run=dry_run)
def processEpisode(obj, basedir, *, dry_run=False):
def processEpisode(
obj: Dict[str, Any],
basedir: str,
*, dry_run: bool = False
) -> bool:
''' Parse a single podcast episode. '''
uid = obj['cms-pk']
if db_ids.contains(COHORT, uid):
return False # Already exists
@@ -86,18 +101,18 @@ def processEpisode(obj, basedir, *, dry_run=False):
@FileWrite.once(dest_dir, fname + '.txt', date, override=False,
dry_run=dry_run, verbose=True, intro=flag or intro)
def write_description():
def write_description() -> str:
nonlocal flag
flag = True
desc = title + '\n' + '=' * len(title)
desc += '\n\n' + StrFormat.strip_html(obj['body'])
desc = '{}\n{}\n\n{}'.format(
title, '=' * len(title), StrFormat.strip_html(obj['body']))
if img_desc:
desc += '\n\n' + img_desc
return desc + '\n\n\n' + obj['url'].strip() + '\n' # link to article
return '{}\n\n\n{}\n'.format(desc, obj['url'].strip()) # article link
@FileWrite.once(dest_dir, fname + '.transcript.txt', date, override=False,
dry_run=dry_run, verbose=True, intro=flag or intro)
def write_transcript():
def write_transcript() -> Optional[str]:
nonlocal flag
flag = True
data = StrFormat.strip_html(obj['transcript'])
@@ -111,7 +126,8 @@ def processEpisode(obj, basedir, *, dry_run=False):
return flag # potentially need to query the next page too
def get_img_desc(obj):
def get_img_desc(obj: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
''' Extract image description. '''
if not obj:
return (None, None)
url = (obj['url'] or '').strip()
@@ -135,7 +151,8 @@ def get_img_desc(obj):
# -> inurl:radiolab/episodes site:wnycstudios.org
# Then regex: /episodes/([^;]*?)" onmousedown
def processSingle(slug, basedir):
def processSingle(slug: str, basedir: str) -> None:
''' [internal] process single episode if only the slug is known. '''
# cms-pk = 91947 , slug = '91947-do-i-know-you'
all_slugs = [slug for _, _, _, slug in db_slugs]
if slug not in all_slugs: