This commit is contained in:
relikd
2022-03-16 21:53:02 +01:00
commit 68167e9d1d
12 changed files with 1037 additions and 0 deletions

7
LICENSE Normal file
View File

@@ -0,0 +1,7 @@
Copyright 2022 relikd
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

17
README.md Normal file
View File

@@ -0,0 +1,17 @@
# botlib
A collection of scripts to parse and process input files (html, RSS, Atom) without dependencies<sup>1</sup>.
<sup>1</sup>: Well, except for `pytelegrambotapi`, but feel free to replace it with another module.
Includes tools for extracting “news” articles, detection of duplicates, content downloader, cron-like repeated events, and Telegram bot client.
Basically everything you need for quick and dirty format processing (html->RSS, notification->telegram, rss/podcast->download, webscraper, ...) without writing the same code over and over again.
The architecture is modular and pipeline processing oriented.
Use whatever suits the task at hand.
### In progress
Documentation, examples, tests and `setup.py` will be added soon.
Meanwhile, take a look at the [Usage](botlib/README.md) documentation.

180
botlib/README.md Normal file
View File

@@ -0,0 +1,180 @@
# Usage
Just import the parts you need:
```py
from botlib.cli import Cli, DirType
from botlib.cron import Cron
from botlib.curl import Curl
from botlib.feed2list import Feed2List
from botlib.helper import Log, FileTime, StrFormat
from botlib.html2list import HTML2List, MatchGroup
from botlib.oncedb import OnceDB
from botlib.tgclient import TGClient
```
## Cli, DirType
TODO
## Cron
Simple recurring tasks. Either manually or via csv-jobs-file.
```py
# If you just have one job to do:
Cron.simple(5, callback).fire()
# OR, customize:
cron = Cron(sleep=range(1, 8))
# Load from CSV
cron.load_csv('jobs.csv', str, cols=[int, str, str])
cron.save_csv('jobs.csv', cols=['chat-id', 'url', 'regex'])
# Or add jobs manually
cron.add_job(10, callback, ['42', 'custom', obj]) # every 10 min
cron.add_job(1440, clean_db) # daily
cron.start() # always call start() to begin updates
cron.fire() # optionally: fire callbacks immediatelly
```
Note: when working with `OnceDB`, make sure you open the DB inside the callback method.
Otherwise SQLite will complain about not using the same thread on which it was created.
## Curl
Used to download web content. Ignores all network errors (just logs them).
Implements a quick cache: If you request the same URL within 45 seconds twice, reuse previously downloaded content.
Includes an etag / last-modified check to reduce network load.
```py
# for these 3 calls, create a download connection just once.
# the result is stored in cache/curl-example.org-1234..ABC.data
Curl.get('https://example.org')
Curl.get('https://example.org')
Curl.get('https://example.org')
# if the URL path is different, the content is downloaded into another file
Curl.get('https://example.org?1')
# download files
Curl.file('https://example.org/image.png', './example-image.png')
# ... or just open a connection
with Curl.open('https://example.org') as wp:
wp.read()
```
There is also an easy-getter to download files only if they do not appear locally.
```py
Curl.once('./dest/dir/', 'filename', [
'https://example.org/audio.mp3',
'https://example.org/image.jpg'
], date, desc='my super long description')
```
This will check whether `./dest/dir/filename.mp3` and `./dest/dir/filename.jpg` exists and if not, download them.
It will also put the content of desc in `./dest/dir/filename.txt` (again, only if the file does not exist yet).
All file modification dates will be set to `date`.
## Feed2List
TODO
## Log, FileTime, StrFormat
TODO
## HTML2List, MatchGroup
Used to parse html content into a feed-like list.
Selector is a CSS-selector matching a single tag (with optional classes).
```py
match = MatchGroup({
'url': r'<a href="([^"]*)">',
'title': r'<a [^>]*>([\s\S]*?)</a>'
})
source = open('path/to/src.html') # auto-closed in parse()
selector = 'article.main'
for elem in reversed(HTML2List(selector).parse(source)):
match.set_html(elem)
if match['url']:
print('<a href="{url}">{title}</a>'.format(**match))
```
You may also call this script directly (CLI):
```sh
html2list.py \
'path/to/src.html' \
'article.main' \
-t '<a href="{#url#}">{#title#}</a>' \
'url:<a href="([^"]*)">' \
'title:<a [^>]*>([\s\S]*?)</a>'
```
If you omit the template (`-t`), the output will be in JSON format.
## OnceDB
Used as cache. DB ensures that each unique-id entry is evaluated once.
Adding existing entries is silently ignored.
You can iterate over existing entries that haven't been processed yet.
```py
db = OnceDB('cache.sqlite')
# Either do a pre-evaluation to break execution early:
if db.contains(cohort, uid):
continue
# Or, just put a new object into the store.
# If it exists, the object is not added a second time
db.put(cohort, uid, 'my-object')
# Entries are unique regarding cohort + uid
# If you cleanup() the DB, entries are grouped by cohort and then cleaned
db.cleanup(limit=20) # keep last 20 entries, delete earlier entries
# The DB also acts as a queue, you can enumerate outstanding entries
def _send(cohort, uid, obj):
# Do stuff
return True if success else False # if False, cancel enumeration
if not db.foreach(_send):
# something went wrong, you returned False in _send()
pass
```
## TGClient
Communcation with Telegram Bot API. (`pip3 install pytelegrambotapi`)
```py
# Make it simple to just retrieve a chat-id
TGClient.listen_chat_info(API_KEY, 'username')
exit(0)
# Else: create a one-time bot
bot = TGClient(API_KEY, polling=False, allowedUsers=['username'])
bot.send(chat_id, 'message', parse_mode='HTML', disable_web_page_preview=True)
# Or: create a polling bot
bot = TGClient(API_KEY, polling=True, allowedUsers=['username'])
bot.set_on_kill(cron.stop)
@bot.message_handler(commands=['info'])
def current_job_info(message):
if bot.allowed(message): # checks if user is permitted
try:
bot.reply_to(message, cron.get(message.chat.id).object,
disable_web_page_preview=True)
except KeyError:
bot.reply_to(message, 'Not found.')
```

3
botlib/__init__.py Executable file
View File

@@ -0,0 +1,3 @@
# import sys
# if __name__ != '__main__':
# sys.path.insert(0, __path__[0])

30
botlib/cli.py Executable file
View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python3
import os
from argparse import ArgumentParser, ArgumentTypeError, FileType
def DirType(string):
if os.path.isdir(string):
return string
raise ArgumentTypeError(
'Directory does not exist: "{}"'.format(os.path.abspath(string)))
class Cli(ArgumentParser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def arg(self, *args, **kwargs):
self.add_argument(*args, **kwargs)
def arg_bool(self, *args, **kwargs):
self.add_argument(*args, **kwargs, action='store_true')
def arg_dir(self, *args, **kwargs):
self.add_argument(*args, **kwargs, type=DirType)
def arg_file(self, *args, mode='r', **kwargs):
self.add_argument(*args, **kwargs, type=FileType(mode))
def parse(self):
return self.parse_args()

127
botlib/cron.py Executable file
View File

@@ -0,0 +1,127 @@
#!/usr/bin/env python3
from sys import stderr
from threading import Timer
from datetime import datetime as date
class RepeatTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
class Cron:
class Job:
def __init__(self, interval, callback, object=None):
self.interval = interval
self.callback = callback
self.object = object
def run(self, ts=0):
if self.interval > 0 and ts % self.interval == 0:
self.callback(self.object)
@staticmethod
def simple(interval: int, callback, arg=None, *, sleep=range(1, 8)):
cron = Cron(sleep=sleep)
cron.add_job(interval, callback, arg)
cron.start()
return cron
def __init__(self, *, sleep=range(1, 8)):
self.sleep = sleep
self._timer = None
self._last_t = -1
self.clear()
def clear(self):
self.jobs = []
def add_job(self, interval: int, callback, arg=None):
job = Cron.Job(interval, callback, arg)
self.push(job)
return job
def push(self, job):
assert isinstance(job, Cron.Job), type(job)
self.jobs.append(job)
def pop(self, key):
return self.jobs.pop(self.jobs.index(self.get(key)))
def get(self, key):
for x in self.jobs:
obj = x.object
if not obj:
continue
if (isinstance(obj, list) and obj[0] == key) or obj == key:
return x
raise KeyError('Key not found: ' + str(key))
# CSV import / export
def load_csv(self, fname: str, callback, *, cols: []):
self.clear()
try:
with open(fname) as fp:
for line in fp.readlines():
if line.startswith('#'):
continue
time, *obj = [x.strip() or None for x in line.split(',')]
obj = [fn(o) if o else None for o, fn in zip(obj, cols)]
if len(obj) < len(cols):
obj += [None] * (len(cols) - len(obj))
self.add_job(int(time), callback, obj)
except FileNotFoundError:
print('File "{}" not found. No jobs loaded.'.format(fname),
file=stderr)
return len(self.jobs)
def save_csv(self, fname: str, *, cols: [str]):
with open(fname, 'w') as fp:
fp.write(' , '.join(['# interval'] + cols) + '\n')
for job in self.jobs:
if not job.object:
continue
fp.write(str(job.interval))
if isinstance(job.object, list):
for x in job.object:
fp.write(',' + ('' if x is None else str(x)))
else:
fp.write(',' + str(job.object))
fp.write('\n')
# Handle repeat timer
def start(self):
if not self._timer:
self._timer = RepeatTimer(15, self._callback)
self._timer.start() # cancel()
def stop(self):
if self._timer:
if self._timer.is_alive():
self._timer.cancel()
self._timer = None
def fire(self):
now = date.now()
self._last_t = now.day * 1440 + now.hour * 60 + now.minute
for job in self.jobs:
job.run()
def _callback(self):
now = date.now()
if now.hour in self.sleep:
return
# Timer called multiple times per minute. Assures fn is called once.
ts = now.day * 1440 + now.hour * 60 + now.minute
if self._last_t == ts:
return
self._last_t = ts
for job in self.jobs:
job.run(ts)
def __str__(self):
return '\n'.join('@{}m {}'.format(job.interval, job.object)
for job in self.jobs)

137
botlib/curl.py Executable file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
import os
import json
from sys import stderr
from hashlib import md5
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from urllib.request import urlretrieve, urlopen, Request
from .helper import FileTime
import ssl
# somehow macOS default behavior for SSL verification is broken
ssl._create_default_https_context = ssl._create_unverified_context
def _read_modified_header(fname: str): # dict or None
if not os.path.isfile(fname):
return None
res = {}
with open(fname) as fp:
head = dict(x.strip().split(': ', 1) for x in fp.readlines())
etag = head.get('Etag')
if etag:
res['If-None-Match'] = etag
lastmod = head.get('Last-Modified')
if lastmod:
res['If-Modified-Since'] = lastmod.replace('-gzip', '')
return res or None
class Curl:
CACHE_DIR = 'cache'
@staticmethod
def valid_url(url):
url = url.strip().replace(' ', '+')
x = urlparse(url)
return x if x.scheme and x.netloc else None
@staticmethod
def url_hash(url) -> str:
x = Curl.valid_url(url)
return '{}-{}'.format(x.hostname if x else 'ERR',
md5(url.encode()).hexdigest())
@staticmethod
def open(url: str, *, headers={}): # url-open-pointer or None
try:
head = {'User-Agent': 'Mozilla/5.0'}
if headers:
head.update(headers)
return urlopen(Request(url, headers=head))
except Exception as e:
if isinstance(e, HTTPError) and e.getcode() == 304:
# print('Not-Modified: {}'.format(url), file=stderr)
return None # ignore not-modified
print('ERROR: Load URL "{}" -- {}'.format(url, e), file=stderr)
return None
@staticmethod
def get(url: str, *, cache_only=False): # file-pointer
'''
Returns an already open file pointer.
You are responsible for closing the file.
NOTE: `HTML2List.parse` and `Feed2List.parse` will close it for you.
'''
fname = '{}/curl-{}.data'.format(Curl.CACHE_DIR, Curl.url_hash(url))
fname_head = fname[:-5] + '.head'
# If file was created less than 45 sec ago, reuse cached value
if cache_only or (os.path.isfile(fname) and FileTime.get(fname) < 45):
return open(fname)
os.makedirs(Curl.CACHE_DIR, exist_ok=True)
conn = Curl.open(url, headers=_read_modified_header(fname_head))
if conn:
with open(fname_head, 'w') as fp:
fp.write(str(conn.info()).strip())
with open(fname, 'wb') as fp:
while True:
data = conn.read(8192) # 1024 Bytes
if not data:
break
fp.write(data)
if os.path.isfile(fname):
return open(fname)
@staticmethod
def json(url: str, fallback=None, *, cache_only=False) -> object:
conn = Curl.get(url, cache_only=cache_only)
if not conn:
return fallback
with conn as fp:
return json.load(fp)
@staticmethod
def file(url: str, dest_path: str, *, raise_except=False) -> bool:
tmp_file = dest_path + '.inprogress'
try:
urlretrieve(url, tmp_file)
os.rename(tmp_file, dest_path) # atomic download, no broken files
return True
except HTTPError as e:
# print('ERROR: Load URL "{}" -- {}'.format(url, e), file=stderr)
if raise_except:
raise e
return False
@staticmethod
def once(dest_dir, fname, urllist, date, desc=None, *,
override=False, dry_run=False, verbose=False, intro=''):
did_update = False
for url_str in urllist:
parts = Curl.valid_url(url_str)
if not parts:
raise URLError('URL not valid: "{}"'.format(url_str))
ext = parts.path.split('.')[-1] or 'unknown'
file_path = os.path.join(dest_dir, fname + '.' + ext)
if override or not os.path.isfile(file_path):
if not did_update and verbose and intro:
print(intro)
did_update = True
if verbose:
print(' GET', parts.geturl())
if not dry_run:
Curl.file(parts.geturl(), file_path, raise_except=True)
FileTime.set(file_path, date)
if desc:
desc_path = os.path.join(dest_dir, fname + '.txt')
if override or not os.path.isfile(desc_path):
did_update = True
if verbose:
print(' >', desc_path)
if not dry_run:
with open(desc_path, 'w') as f:
f.write(desc)
FileTime.set(desc_path, date)
return did_update

60
botlib/feed2list.py Normal file
View File

@@ -0,0 +1,60 @@
#!/usr/bin/env python3
import xml.etree.ElementTree as ET
from .helper import StrFormat
def Feed2List(fp, *, keys=[]):
def parse_xml_without_namespace(file):
ns = {}
xml_iter = ET.iterparse(file, ('start-ns', 'start'))
for event, elem in xml_iter:
if event == 'start-ns':
ns['{' + elem[1]] = elem[0] + ':' if elem[0] else ''
elif event == 'start':
tag = elem.tag.split('}')
elem.tag = ''.join(ns[x] for x in tag[:-1]) + tag[-1]
return xml_iter.root
# detect feed format (RSS / Atom)
root = parse_xml_without_namespace(fp)
fp.close()
if root.tag == 'rss': # RSS
selector = 'channel/item'
date_fields = ['pubDate', 'lastBuildDate']
elif root.tag == 'feed': # Atom
selector = 'entry'
date_fields = ['updated', 'published']
else:
raise NotImplementedError('Unrecognizable feed format')
# parse XML
result = []
for item in root.findall(selector):
obj = {}
for child in item:
tag = child.tag
# Filter keys that are clearly not wanted by user
if keys and tag not in keys:
continue
value = (child.text or '').strip()
# For date-fields, create and return date
if tag in date_fields:
value = StrFormat.to_date(value)
# Return dict if has attributes or string without attribs
attr = child.attrib
if attr:
if value:
attr[''] = value
value = attr
# Auto-create list type if duplicate keys are used
try:
obj[tag]
if not isinstance(obj[tag], list):
obj[tag] = [obj[tag]]
obj[tag].append(value)
except KeyError:
obj[tag] = value
# Each entry has a key-value-dict. Value may be string or attrib-dict.
# Value may also be a list of mixed string and attrib-dict values.
result.append(obj)
return result

73
botlib/helper.py Executable file
View File

@@ -0,0 +1,73 @@
#!/usr/bin/env python3
import re
import os # utime, getmtime
import time # mktime, time
from sys import stderr
from html import unescape
from datetime import datetime
import unicodedata # normalize
from string import ascii_letters, digits
class Log:
@staticmethod
def error(e):
print('{} [ERROR] {}'.format(datetime.now(), e), file=stderr)
@staticmethod
def info(m):
print('{} {}'.format(datetime.now(), m))
class FileTime:
@staticmethod
def set(fname, date):
modTime = time.mktime(date.timetuple())
os.utime(fname, (modTime, modTime))
@staticmethod
def get(fname, *, absolute=False):
x = os.path.getmtime(fname)
return x if absolute else time.time() - x
class StrFormat:
re_img = re.compile(r'<img [^>]*?(?:alt="([^"]*?)"[^>]*)?'
r'src="([^"]*?)"(?:[^>]*?alt="([^"]*?)")?[^>]*?/>')
re_href = re.compile(r'<a [^>]*href="([^"]*?)"[^>]*?>(.*?)</a>')
re_br = re.compile(r'<br[^>]*>|</p>')
re_tags = re.compile(r'<[^>]*>')
re_crlf = re.compile(r'[\n\r]{2,}')
@staticmethod
def strip_html(text):
text = StrFormat.re_img.sub(r'[IMG: \2, \1\3]', text)
text = StrFormat.re_href.sub(r'\2 (\1)', text)
text = StrFormat.re_br.sub('\n', text)
text = StrFormat.re_tags.sub('', text)
text = StrFormat.re_crlf.sub('\n\n', text)
return unescape(text).replace(' ', ' ').strip()
@staticmethod
def to_date(text):
for date_format in (
'%a, %d %b %Y %H:%M:%S %z', # RSS
'%Y-%m-%dT%H:%M:%S%z', # Atom
'%Y-%m-%dT%H:%M:%S.%f%z', # Atom
'%Y-%m-%dT%H:%M:%S', # without timezone
'%Y-%m-%dT%H:%M:%S.%f' # without timezone
):
try:
return datetime.strptime(text, date_format)
except ValueError:
pass
raise ValueError('Could not match date format. {}'.format(text))
fnameChars = set('-_.,() {}{}'.format(ascii_letters, digits))
@staticmethod
def safe_filename(text):
text = unicodedata.normalize('NFKD', text) # makes 2-bytes of umlauts
text = text.replace('̈', 'e') # replace umlauts e.g., Ä -> Ae
text = text.encode('ASCII', 'ignore')
return ''.join(chr(c) for c in text if chr(c) in StrFormat.fnameChars)

213
botlib/html2list.py Normal file
View File

@@ -0,0 +1,213 @@
#!/usr/bin/env python3
import re
import json
from sys import stderr
from argparse import ArgumentParser, FileType
from html.parser import HTMLParser
class CSSSelector:
''' Limited support, match single tag with classes: div.class.other '''
def __init__(self, selector):
if any(x in ' >+' for x in selector):
raise NotImplementedError(
'No support for nested tags. "{}"'.format(selector))
self.tag, *self.cls = selector.split('.')
def matches(self, tag, attrs):
if self.tag and tag != self.tag:
return False
if self.cls:
for k, val in attrs:
if k == 'class':
classes = val.split()
return all(x in classes for x in self.cls)
return False
return True
class HTML2List(HTMLParser):
'''
:select: CSS-selector should match a list of articles.
:callback: If set, callback is called for each found match.
If not set, return a list of strings instead.
'''
def __init__(self, select, callback=None):
super().__init__()
self._filter = CSSSelector(select)
self._data = '' # temporary data built-up
self._elem = [] # tag stack
self._tgt = 0 # remember matching level for filter
self._result = [] # empty if callback
self._callback = callback or self._result.append
def parse(self, source):
'''
:source: A file-pointer or web-source with read() attribute.
Warning: return value empty if callback is set!
'''
def rb2str(data, fp, limit=256):
try:
return data.decode('utf-8')
except UnicodeDecodeError:
extra = fp.read(limit)
if not extra:
return data
return rb2str(data + extra, fp, limit)
if not source:
return []
while True:
try:
data = source.read(65536) # 64k
except Exception as e:
print('ERROR: {}'.format(e), file=stderr)
data = None
if not data:
break
if isinstance(data, bytes):
data = rb2str(data, source)
self.feed(data)
source.close()
self.close()
return self._result
def handle_starttag(self, tag, attrs):
self._elem.append(tag)
if self._filter.matches(tag, attrs):
if self._tgt > 0:
raise RuntimeError('No nested tags! Adjust your filter.')
self._tgt = len(self._elem) - 1
if self._tgt > 0:
self._data += self.get_starttag_text()
def handle_startendtag(self, tag, attrs):
self._elem.append(tag)
if self._tgt > 0:
self._data += self.get_starttag_text()
def handle_data(self, data):
if self._tgt > 0:
self._data += data
def handle_endtag(self, tag):
if self._tgt > 0:
self._data += '</{}>'.format(tag)
# drop any non-closed tags
while self._elem[-1] != tag:
self._elem.pop(-1) # e.g., <img> which is not start-end-type
self._elem.pop(-1) # remove actual closing tag
# if level matches search-level, yield whole element
if len(self._elem) == self._tgt:
self._tgt = 0
if self._data:
# print('DEBUG:', self._data)
self._callback(self._data)
self._data = ''
class Grep:
'''
Use `[\\s\\S]*?` to match multi-line content.
Will replace all continuous whitespace (incl. newline) with a single space.
If you whish to keep whitespace, set cleanup to False.
'''
re_whitespace = re.compile(r'\s+') # will also replace newline with space
def __init__(self, regex, *, cleanup=True):
self.cleanup = cleanup
self._rgx = re.compile(regex)
def find(self, text):
res = self._rgx.search(text)
if not res:
return None
res = res.groups()[0]
if self.cleanup:
return self.re_whitespace.sub(' ', res.strip())
return res
class MatchGroup:
''' Use {#tagname#} to replace values with regex value. '''
re_tag = re.compile(r'{#(.*?)#}')
def __init__(self, grepDict={}):
self._regex = {}
for k, v in grepDict.items():
self.add(k, v)
self.set_html('')
def add(self, tagname, regex, *, cleanup=True):
self._regex[tagname] = \
regex if isinstance(regex, Grep) else Grep(regex, cleanup=cleanup)
def set_html(self, html):
self._html = html
self._res = {}
return self
def keys(self):
return self._regex.keys()
def __getitem__(self, key):
try:
return self._res[key]
except KeyError:
val = self._regex[key].find(self._html)
self._res[key] = val
return val
def __str__(self):
return '\n'.join(
'{}: {}'.format(k, self._res.get(k, '<?>')) for k in self._regex)
def to_dict(self):
return {k: self[k] for k in self._regex}
def use_template(self, template):
''' Use {#tagname#} to replace values with regex value. '''
return self.re_tag.sub(lambda x: self[x.groups()[0]], template)
def _cli():
parser = ArgumentParser()
parser.add_argument('FILE', type=FileType('r'), help='Input html file')
parser.add_argument('selector', help='CSS selector. E.g., article.entry')
parser.add_argument('-t', '--template',
help='E.g., <a href="{#url#}">{#title#}</a>')
parser.add_argument('regex', nargs='+',
help='''"tagname:regex" E.g., 'url:<a href="(.*?)">'
'title:<a [^>]*>([\\s\\S]*?)</a>'
''')
args = parser.parse_args()
# create grep/regex mapping
grp = MatchGroup()
for x in args.regex:
try:
tag, regex = x.split(':', 1)
grp.add(tag, regex)
except ValueError:
print('Did you forget to prefix a tagname? `{}`'.format(x),
file=stderr)
exit(1)
# parse
if args.template:
try:
for x in HTML2List(args.selector).parse(args.FILE):
print(grp.set_html(x).use_template(args.template))
except KeyError as e:
print('Did you forget a tagname? ' + str(e), file=stderr)
else:
print(json.dumps([
grp.set_html(x).to_dict()
for x in
HTML2List(args.selector).parse(args.FILE)
]))
if __name__ == '__main__':
_cli()

102
botlib/oncedb.py Executable file
View File

@@ -0,0 +1,102 @@
#!/usr/bin/env python3
'''
Usage: Load existing `OnceDB()` and `put(cohort, uid, obj)` new entries.
The db ensures that (cohort, uid) pairs are unique. You can add as
many times as you like. Use an (reversed) iterator to enumerate
outstanding entries `for rowid, cohort, uid, obj in reversed(db)`.
Call `mark_done(rowid)` to not process an item again.
Once in a while call `cleanup()` to remove old entries.
'''
import sqlite3
class OnceDB:
def __init__(self, db_path):
self._db = sqlite3.connect(db_path)
self._db.execute('''
CREATE TABLE IF NOT EXISTS queue(
ts DATE DEFAULT (strftime('%s', 'now')),
cohort TEXT NOT NULL,
uid TEXT NOT NULL,
obj BLOB, -- NULL signals a done mark. OR: introduce new var
PRIMARY KEY (cohort, uid) -- SQLite will auto-create index
);
''')
def __del__(self):
self._db.close()
def cleanup(self, limit=200):
''' Delete oldest (cohort) entries if more than limit exist. '''
self._db.execute('''
WITH _tmp AS (
SELECT ROWID, row_number() OVER (
PARTITION BY cohort ORDER by ROWID DESC) AS c
FROM queue
WHERE obj IS NULL
)
DELETE FROM queue
WHERE ROWID in (SELECT ROWID from _tmp WHERE c > ?);
''', (limit,))
self._db.commit()
def put(self, cohort, uid, obj):
''' Silently ignore if a duplicate (cohort, uid) is added. '''
try:
self._db.execute('''
INSERT INTO queue (cohort, uid, obj) VALUES (?, ?, ?);
''', (cohort, uid, obj))
self._db.commit()
return True
except sqlite3.IntegrityError:
# entry (cohort, uid) already exists
return False
def contains(self, cohort, uid):
cur = self._db.cursor()
cur.execute('''
SELECT 1 FROM queue WHERE cohort IS ? AND uid is ? LIMIT 1;
''', (cohort, uid))
flag = cur.fetchone() is not None
cur.close()
return flag
def mark_done(self, rowid):
''' Mark (ROWID) as done. Entry remains in cache until cleanup(). '''
if not isinstance(rowid, int):
raise AttributeError('Not of type ROWID: {}'.format(rowid))
self._db.execute('UPDATE queue SET obj = NULL WHERE ROWID = ?;',
(rowid, ))
self._db.commit()
def mark_all_done(self):
''' Mark all entries done. Entry remains in cache until cleanup(). '''
self._db.execute('UPDATE queue SET obj = NULL;')
self._db.commit()
def foreach(self, callback, *, reverse=False):
'''
Exec for all until callback evaluates to false (or end of list).
Automatically marks entries as done (only on success).
'''
for rowid, *elem in reversed(self) if reverse else self:
if callback(*elem):
self.mark_done(rowid)
else:
return False
return True
def __iter__(self, *, reverse=False):
''' Perform query on all un-marked / not-done entries. '''
cur = self._db.cursor()
cur.execute('''
SELECT ROWID, cohort, uid, obj FROM queue
WHERE obj IS NOT NULL
ORDER BY ROWID {};
'''.format('DESC' if reverse else 'ASC'))
yield from cur.fetchall()
cur.close()
def __reversed__(self):
return self.__iter__(reverse=True)

88
botlib/tgclient.py Executable file
View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python3
import telebot # pip3 install pytelegrambotapi
from threading import Thread
from time import sleep
from .helper import Log
class Kill(Exception):
pass
class TGClient(telebot.TeleBot):
@staticmethod
def listen_chat_info(api_key, user):
bot = TGClient(api_key, polling=True, allowedUsers=[user])
@bot.message_handler(commands=['start'])
def handle_start(message):
bot.log_chat_info(message.chat)
raise Kill()
return bot
def __init__(self, apiKey, *, polling, allowedUsers=[], **kwargs):
super().__init__(apiKey, **kwargs)
self.users = allowedUsers
self.onKillCallback = None
if polling:
def _fn():
try:
Log.info('Ready')
self.polling(skip_pending=True) # none_stop=True
except Kill:
Log.info('Quit by /kill command.')
if self.onKillCallback:
self.onKillCallback()
return
except Exception as e:
Log.error(e)
Log.info('Auto-restart in 15 sec ...')
sleep(15)
_fn()
Thread(target=_fn, name='Polling').start()
@self.message_handler(commands=['?'])
def _healthcheck(message):
if self.allowed(message):
self.reply_to(message, 'yes')
@self.message_handler(commands=['kill'])
def _kill(message):
if self.allowed(message):
self.reply_to(message, 'bye bye')
raise Kill()
def set_on_kill(self, callback):
self.onKillCallback = callback
# Helper methods
def log_chat_info(self, chat):
Log.info('[INFO] chat-id: {} ({}, title: "{}")'.format(
chat.id, chat.type, chat.title or ''))
def allowed(self, src_msg):
return not self.users or src_msg.from_user.username in self.users
def send(self, chat_id, msg, **kwargs):
try:
return self.send_message(chat_id, msg, **kwargs)
except Exception as e:
Log.error(e)
sleep(45)
return None
def send_buttons(self, chat_id, msg, options):
markup = telebot.types.ReplyKeyboardMarkup(one_time_keyboard=True)
markup.add(*(telebot.types.KeyboardButton(x) for x in options))
return self.send_message(chat_id, msg, reply_markup=markup)
def send_abort_keyboard(self, src_msg, reply_msg):
return self.reply_to(src_msg, reply_msg,
reply_markup=telebot.types.ReplyKeyboardRemove())
def send_force_reply(self, chat_id, msg):
return self.send_message(chat_id, msg,
reply_markup=telebot.types.ForceReply())