add examples

This commit is contained in:
relikd
2022-04-08 20:22:08 +02:00
parent 45dfc31966
commit e871e6f03e
10 changed files with 488 additions and 0 deletions

1
.gitattributes vendored Normal file
View File

@@ -0,0 +1 @@
examples/** linguist-documentation

View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python3
import os
from sys import stderr
from botlib.cli import Cli
from botlib.curl import Curl
from botlib.feed2list import Feed2List
from botlib.helper import StrFormat, FileWrite
def main():
cli = Cli()
cli.arg_dir('dest_dir', help='Download all entries here')
cli.arg('source', help='RSS file or web-url')
cli.arg_bool('--dry-run', help='Do not download, just parse')
cli.arg_bool('--by-year', help='Place episodes in dest_dir/year/')
args = cli.parse()
try:
print('Processing:', args.dest_dir)
process(args.source, args.dest_dir,
by_year=args.by_year, dry_run=args.dry_run)
print('Done.')
except Exception as e:
print('ERROR: ' + str(e), file=stderr)
def process(source, dest_dir, *, by_year=False, dry_run=False):
# open source
if os.path.isfile(source):
fp = open(source) # closed in Feed2List
elif Curl.valid_url(source):
fp = Curl.get(source) # closed in Feed2List
else:
raise AttributeError('Not a valid file or URL: "{}"'.format(source))
# process
dest = dest_dir
for entry in reversed(Feed2List(fp, keys=[
'link', 'title', 'description', 'enclosure', # audio
'pubDate', 'media:content', # image
# 'itunes:image', 'itunes:duration', 'itunes:summary'
])):
date = entry.get('pubDate') # try RSS only
if by_year:
dest = os.path.join(dest_dir, str(date.year))
if not dry_run and not os.path.exists(dest):
os.mkdir(dest)
process_entry(entry, date, dest, dry_run=dry_run)
return True
def process_entry(entry, date, dest_dir, *, dry_run=False):
title = entry['title']
# <enclosure url="*.mp3" length="47216000" type="audio/mpeg"/>
audio_url = entry.get('enclosure', {}).get('url')
if not audio_url:
print(' ERROR: URL not found for "{}"'.format(title), file=stderr)
return
# <media:content url="*.jpg" width="300" rel="full_image" height="300" />
images = entry.get('media:content', [])
if not isinstance(images, list):
images = [images]
maxRes = 0
image_url = None
for img in images:
res = int(img.get('width', 0)) * int(img.get('height', 0))
if res > maxRes:
maxRes = res
image_url = img.get('url')
# make request
fname = '{} - {}'.format(date.strftime('%Y-%m-%d'),
StrFormat.safe_filename(title))
intro = '\ndownloading: ' + fname
urllist = [audio_url, image_url] if image_url else [audio_url]
flag = Curl.once(dest_dir, fname, urllist, date, override=False,
dry_run=dry_run, verbose=True, intro=intro)
@FileWrite.once(dest_dir, fname + '.txt', date, override=False,
dry_run=dry_run, verbose=True, intro=flag or intro)
def _description():
desc = title + '\n' + '=' * len(title)
desc += '\n\n' + StrFormat.strip_html(entry.get('description', ''))
return desc + '\n\n\n' + entry.get('link', '') + '\n'
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,150 @@
#!/usr/bin/env python3
import os
from sys import stderr
from botlib.cli import Cli
from botlib.curl import Curl, URLError
from botlib.helper import StrFormat, FileWrite
from botlib.oncedb import OnceDB
API = 'http://api.wnyc.org/api/v3'
COHORT = 'radiolab'
db_ids = OnceDB('radiolab_ids.sqlite')
db_slugs = OnceDB('radiolab_slugs.sqlite')
# published-at does not contain timezone info, but is assumed to be EST
os.environ['TZ'] = 'America/New_York'
def main():
cli = Cli()
cli.arg_dir('dest_dir', help='Download all episodes to dest_dir/year/')
cli.arg_bool('--dry-run', help='Do not download, just parse')
args = cli.parse()
try:
for title, query in (
('Podcasts', 'radiolab/podcasts'),
('Radio Shows', 'radiolab/radio-shows'),
# ('Broadcasts', 'radiolabmoreperfect/radio-broadcasts'),
):
processEpisodeList(args.dest_dir, title, query,
dry_run=args.dry_run)
except Exception as e:
print(' ERROR: ' + str(e), file=stderr)
exit(1)
print('\nDone.\n\nNow check MP3 tags (consistency).')
def processEpisodeList(basedir, title, query, index=1, *, dry_run=False):
print('\nProcessing: {}'.format(title), end='')
dat = Curl.json('{}/channel/shows/{}/{}?limit=9'.format(API, query, index))
total = dat['data']['attributes']['total-pages']
print(' ({}/{})'.format(index, total))
anything_new = False
for inc in dat['included']:
anything_new |= processEpisode(inc['attributes'], basedir,
dry_run=dry_run)
if anything_new and index < total:
processEpisodeList(basedir, title, query, index + 1, dry_run=dry_run)
def processEpisode(obj, basedir, *, dry_run=False):
uid = obj['cms-pk']
if db_ids.contains(COHORT, uid):
return False # Already exists
title = obj['title'].strip()
slug = obj['slug'].strip()
# [newsdate] 2009-11-03T00:35:34-05:00 [publish-at] 2009-11-03T00:35:34
date_a = StrFormat.to_date(obj['newsdate'].strip())
date_b = StrFormat.to_date(obj['publish-at'].strip())
date = date_a if date_a.timestamp() <= date_b.timestamp() else date_b
# create by-year subdir
dest_dir = os.path.join(basedir, str(date.year))
if not dry_run and not os.path.exists(dest_dir):
os.mkdir(dest_dir)
# make filename and download list
fname = '{} - {}'.format(date.strftime('%Y-%m-%d'),
StrFormat.safe_filename(title))
urllist = [obj['audio'], obj['video']]
urllist = [x for x in urllist if isinstance(x, str) and Curl.valid_url(x)]
if not urllist:
print('\ndownloading: {} ({}, {})'.format(fname, uid, slug))
print(' No downloadable media found.')
return False
# get image
img_url, img_desc = get_img_desc(obj['image-main'])
if img_url:
urllist.append(img_url)
# download files
intro = '\ndownloading: {} ({})'.format(fname, uid)
flag = Curl.once(dest_dir, fname, urllist, date, override=False,
dry_run=dry_run, verbose=True, intro=intro)
@FileWrite.once(dest_dir, fname + '.txt', date, override=False,
dry_run=dry_run, verbose=True, intro=flag or intro)
def write_description():
nonlocal flag
flag = True
desc = title + '\n' + '=' * len(title)
desc += '\n\n' + StrFormat.strip_html(obj['body'])
if img_desc:
desc += '\n\n' + img_desc
return desc + '\n\n\n' + obj['url'].strip() + '\n' # link to article
@FileWrite.once(dest_dir, fname + '.transcript.txt', date, override=False,
dry_run=dry_run, verbose=True, intro=flag or intro)
def write_transcript():
nonlocal flag
flag = True
data = StrFormat.strip_html(obj['transcript'])
return data + '\n' if data else None
# success! now save state
if flag and not dry_run:
db_ids.put(COHORT, uid, fname)
db_slugs.put(COHORT, uid, slug)
print(' SLUG: {}'.format(slug))
return flag # potentially need to query the next page too
def get_img_desc(obj):
if not obj:
return (None, None)
url = (obj['url'] or '').strip()
if not url:
return (None, None)
txt = None
cred_name = obj['credits-name'].strip()
cred_url = obj['credits-url'].strip()
if cred_name:
txt = 'Image by ' + cred_name
if cred_url:
if txt:
txt += ' @ ' + cred_url
else:
txt = 'Image source: ' + cred_url
return (url, txt)
# Individuals taken from Google search
# -> inurl:radiolab/segments site:wnycstudios.org
# -> inurl:radiolab/episodes site:wnycstudios.org
# Then regex: /episodes/([^;]*?)" onmousedown
def processSingle(slug, basedir):
# cms-pk = 91947 , slug = '91947-do-i-know-you'
all_slugs = [slug for _, _, _, slug in db_slugs]
if slug not in all_slugs:
print(slug)
data = Curl.json('{}/story/{}'.format(API, slug))
try:
processEpisode(data['data']['attributes'], basedir, dry_run=True)
except URLError as e:
print(' ERROR: ' + str(e), file=stderr)
main()

View File

@@ -0,0 +1,6 @@
#!/usr/bin/env python3
from botlib.tgclient import TGClient
print('open a new telegram chat window with your bot and send /start')
TGClient.listen_chat_info(__API_KEY__, 'my-username')

View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python3
from botlib.tgclient import TGClient
bot = TGClient(__API_KEY__, polling=True, allowedUsers=['my-username'])
@bot.message_handler(commands=['hi'])
def bot_reply(message):
if bot.allowed(message): # only reply to a single user (my-username)
bot.reply_to(message, 'Good evening my dear.')
@bot.message_handler(commands=['set'])
def update_config(message):
if bot.allowed(message):
try:
config = data_store.get(message.chat.id)
except KeyError:
bot.reply_to(message, 'Not found.')
return
if message.text == '/set day':
config.param = 'day'
elif message.text == '/set night':
config.param = 'night'
else:
bot.reply_to(message, 'Usage: /set [day|night]')
@bot.message_handler(commands=['start'])
def new_chat_info(message):
bot.log_chat_info(message.chat)
if bot.allowed(message):
if data_store.get(message.chat.id):
bot.reply_to(message, 'Already exists')
else:
CreateNew(message)
class CreateNew:
def __init__(self, message):
self.ask_name(message)
def ask_name(self, message):
msg = bot.send_force_reply(message.chat.id, 'Enter Name:')
bot.register_next_step_handler(msg, self.ask_interval)
def ask_interval(self, message):
self.name = message.text
msg = bot.send_buttons(message.chat.id, 'Update interval (minutes):',
options=[3, 5, 10, 15, 30, 60])
bot.register_next_step_handler(msg, self.finish)
def finish(self, message):
try:
interval = int(message.text)
except ValueError:
bot.send_abort_keyboard(message, 'Not a number. Aborting.')
return
print('Name:', self.name, 'interval:', interval)
bot.send_message(message.chat.id, 'done.')

View File

@@ -0,0 +1,54 @@
#!/usr/bin/env python3
from botlib.cron import Cron
from botlib.helper import Log
from botlib.oncedb import OnceDB
from botlib.tgclient import TGClient
# the pipeline process logic is split up:
# - you can have one file for generating the entries and writing to db (import)
# e.g., import an example from web-scraper and call download()
# - and another file to read db and send its entries to telegram (this file)
# of course, you can put your download logic inside this file as well
import sub_job_a as jobA
import sub_job_b as jobB
cron = Cron()
bot = TGClient(__API_KEY__, polling=True, allowedUsers=['my-username'])
bot.set_on_kill(cron.stop)
def main():
def clean_db(_):
Log.info('[clean up]')
OnceDB('cache.sqlite').cleanup(limit=150)
def notify_jobA(_):
jobA.download(topic='development', cohort='dev:py')
send2telegram(__A_CHAT_ID__)
def notify_jobB(_):
jobB.download()
send2telegram(__ANOTHER_CHAT_ID__)
# Log.info('Ready')
cron.add_job(10, notify_jobA) # every 10 min
cron.add_job(30, notify_jobB) # every 30 min
cron.add_job(1440, clean_db) # daily
cron.start()
# cron.fire()
def send2telegram(chat_id):
db = OnceDB('cache.sqlite')
# db.mark_all_done()
def _send(cohort, uid, obj):
Log.info('[push] {} {}'.format(cohort, uid))
return bot.send(chat_id, obj, parse_mode='HTML',
disable_web_page_preview=True)
if not db.foreach(_send):
# send() sleeps 45 sec (on error), safe to call immediatelly
send2telegram(chat_id)
main()

View File

@@ -0,0 +1,35 @@
# How-to web scraping
Use the `playground.py` for quick testing.
Initially, you have to set `cache_only=False` or otherwise no data is downloaded.
After the first download, re-enable `cache_only` so you don't have to download the data over and over again.
Also, when you feel ready, uncomment the `break` statement to see if it works for all entries.
## Finding a proper `select`
The hardest part is getting all regex matches right.
Open the browser devtools and choose the element picker.
Hover over the first element / row of the data you'd like to retrieve.
Pick whatever tag or class seems apropriate, also look at neighboring tags.
The `select` must match all entries but no unnecessary ones.
Although you can always filter unnecessary ones later...
## Finding the regex
The matches for the individual data fields are tricky too.
Select and right-click on the element you picked above.
Important: Either edit or copy as raw HTML.
The devtools will omit whitespace and display `'` as `"`, so you have to make sure you know what you are trying to match.
Now begins the playing around part.
The regex will match the first occurrence, so if there are two anchor tags and you need the second one, you have to get creative.
For example, this is the case in the craigslist example.
Here I can match the second anchor because it is contained in a `h3` heading.
Try to match as compact as possible, this makes it more robust against source code changes.
For example, use `<a [^>]*>` to match an opening anchor with arbitrary attributes.
Some sites will put the `href` immediatelly after `<a`, other somewhere in between.
Be creative.
Use `[\s\S]*?` to match anything (instead of just `.`), including whitespace and newlines.
And finally, have at least one matching group (`()`).
Note: whitespace will be stripped from the matching group.

View File

@@ -0,0 +1,28 @@
#!/usr/bin/env python3
from botlib.curl import Curl
from botlib.html2list import HTML2List, MatchGroup
from botlib.oncedb import OnceDB
def download(*, topic='motherboard', cohort='vice:motherboard'):
db = OnceDB('cache.sqlite')
url = 'https://www.vice.com/en/topic/{}'.format(topic)
select = '.vice-card__content'
match = MatchGroup({
'url': r'<a href="([^"]*)"',
'title': r'<h3[^>]*><a [^>]*>([\s\S]*?)</a>[\s\S]*?</h3>',
'desc': r'<p[^>]*>([\s\S]*?)</p>',
})
for elem in reversed(HTML2List(select).parse(Curl.get(url))):
match.set_html(elem)
x_uid = match['url']
if not x_uid or db.contains(cohort, x_uid):
continue
txt = '<a href="https://www.vice.com{url}">{title}</a>'.format(**match)
txt += '\n' + str(match['desc'])
if txt:
db.put(cohort, x_uid, txt)
# download()

View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python3
from botlib.curl import Curl
from botlib.html2list import HTML2List, MatchGroup
URL = 'https://www.vice.com/en/topic/motherboard'
SOURCE = Curl.get(URL, cache_only=True)
SELECT = '.vice-card__content'
match = MatchGroup({
'url': r'<a href="([^"]*)"',
'title': r'<h3[^>]*><a [^>]*>([\s\S]*?)</a>[\s\S]*?</h3>',
'desc': r'<p[^>]*>([\s\S]*?)</p>',
'wrong-regex': r'<a xref="([\s\S]*?)"',
})
for elem in reversed(HTML2List(SELECT).parse(SOURCE)):
match.set_html(elem)
for k, v in match.to_dict().items():
print(k, '=', v)
print()
break

View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python3
from botlib.curl import Curl
from botlib.html2list import HTML2List, MatchGroup
from botlib.oncedb import OnceDB
CRAIGSLIST = 'https://newyork.craigslist.org/search/boo'
def load(url):
# return open('test.html')
return Curl.get(url)
def download():
db = OnceDB('cache.sqlite')
def proc(cohort, source, select, regex={}, fn=str):
match = MatchGroup(regex)
for elem in reversed(HTML2List(select).parse(source)):
match.set_html(elem)
x_uid = match['url']
if not x_uid or db.contains(cohort, x_uid):
continue
txt = (fn(match) or '').strip()
if txt:
print(txt)
db.put(cohort, x_uid, txt)
proc('boat:craigslist', load(CRAIGSLIST), 'li.result-row', {
'url': r'<a href="([^"]*)"',
'title': r'<h3[\s\S]*?<a [^>]*>([\s\S]*?)</a>[\s\S]*?</h3>',
'price': r'<span class="result-price">([\s\S]*?)</span>',
'hood': r'<span class="result-hood">([\s\S]*?)</span>',
}, lambda match: '''
<a href="{url}">{title}</a>
<strong>{price}</strong>, {hood}'''.format(**match))
# process another source ...
# def fn(match):
# print(match.to_dict())
# return advanced_fn(match)
# proc(cohort, load(url), select, match, fn)
# download()