From 9b4440c70015ab30d2ab41a34ea6d24f9679c7cd Mon Sep 17 00:00:00 2001 From: relikd Date: Wed, 28 Sep 2022 22:35:21 +0200 Subject: [PATCH] feat: add Curl.post() --- botlib/curl.py | 83 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 20 deletions(-) diff --git a/botlib/curl.py b/botlib/curl.py index eeb61e0..0561267 100755 --- a/botlib/curl.py +++ b/botlib/curl.py @@ -48,14 +48,40 @@ class Curl: md5(url.encode()).hexdigest()) @staticmethod - def open(url: str, *, headers: Optional[Dict[str, str]] = None) \ - -> Optional[HTTPResponse]: + def _cached_is_recent(fname: str, *, maxAge: int) -> bool: + fname = os.path.join(Curl.CACHE_DIR, fname) + return os.path.isfile(fname) and FileTime.get(fname) < maxAge + + @staticmethod + def _cached_read( + conn: Optional[HTTPResponse], fname_data: str, fname_head: str + ) -> Optional[TextIO]: + fname_data = os.path.join(Curl.CACHE_DIR, fname_data) + if conn: + os.makedirs(Curl.CACHE_DIR, exist_ok=True) + with open(os.path.join(Curl.CACHE_DIR, fname_head), 'w') as fp: + fp.write(str(conn.info()).strip()) + with open(fname_data, 'wb') as fpb: + while True: + data = conn.read(8192) # 1024 Bytes + if not data: + break + fpb.write(data) + return open(fname_data) if os.path.isfile(fname_data) else None + + @staticmethod + def open( + url: str, + *, + post: Optional[bytes] = None, + headers: Optional[Dict[str, str]] = None, + ) -> Optional[HTTPResponse]: ''' Open a network connection, returl urlopen() result or None. ''' try: head = {'User-Agent': 'Mozilla/5.0'} if headers: head.update(headers) - return urlopen(Request(url, headers=head)) + return urlopen(Request(url, data=post, headers=head)) except Exception as e: if isinstance(e, HTTPError) and e.getcode() == 304: # print('Not-Modified: {}'.format(url), file=stderr) @@ -64,31 +90,48 @@ class Curl: return None @staticmethod - def get(url: str, *, cache_only: bool = False) -> Optional[TextIO]: + def get( + url: str, + *, + cache_only: bool = False, + headers: Optional[Dict[str, str]] = None, + ) -> Optional[TextIO]: ''' Returns an already open file pointer. You are responsible for closing the file. NOTE: `HTML2List.parse` and `Feed2List.parse` will close it for you. ''' - fname = '{}/curl-{}.data'.format(Curl.CACHE_DIR, Curl.url_hash(url)) - fname_head = fname[:-5] + '.head' + fname = 'curl-{}.data'.format(Curl.url_hash(url)) # If file was created less than 45 sec ago, reuse cached value - if cache_only or (os.path.isfile(fname) and FileTime.get(fname) < 45): - return open(fname) + if cache_only or Curl._cached_is_recent(fname, maxAge=45): + return Curl._cached_read(None, fname, '') - os.makedirs(Curl.CACHE_DIR, exist_ok=True) - conn = Curl.open(url, headers=_read_modified_header(fname_head)) - if conn: - with open(fname_head, 'w') as fp: - fp.write(str(conn.info()).strip()) - with open(fname, 'wb') as fpb: - while True: - data = conn.read(8192) # 1024 Bytes - if not data: - break - fpb.write(data) + fname_head = fname[:-5] + '.head' + head = _read_modified_header(fname_head) + if headers: + head.update(headers) + conn = Curl.open(url, headers=head) + return Curl._cached_read(conn, fname, fname_head) - return open(fname) if os.path.isfile(fname) else None + @staticmethod + def post( + url: str, + data: bytes, + *, + cache_only: bool = False, + headers: Optional[Dict[str, str]] = None, + ) -> Optional[TextIO]: + ''' + Perform POST operation. + Returns an already open file pointer. + You are responsible for closing the file. + ''' + fname = 'curl-{}.post.data'.format(Curl.url_hash(url)) + if cache_only: + return Curl._cached_read(None, fname, '') + + conn = Curl.open(url, post=data, headers=headers) + return Curl._cached_read(conn, fname, fname[:-5] + '.head') @staticmethod def json(url: str, fallback: Any = None, *, cache_only: bool = False) \