fix: SQL sanitation with inner join
This commit is contained in:
@@ -8,7 +8,7 @@ from base64 import b64encode
|
|||||||
from typing import List, Dict, Any, Iterable, Optional
|
from typing import List, Dict, Any, Iterable, Optional
|
||||||
|
|
||||||
ITEM_COUNTER = 0
|
ITEM_COUNTER = 0
|
||||||
rx_query = re.compile(r'SELECT([\s\S]*)FROM[\s]+([A-Z_]+)')
|
rx_query = re.compile(r'SELECT([\s\S]*)FROM[\s]+([A-Z_]+)(?:[\s]+INNER JOIN\s+([A-Z_]+))?')
|
||||||
rx_cols = re.compile(r'[\s,;](Z[A-Z_]+)')
|
rx_cols = re.compile(r'[\s,;](Z[A-Z_]+)')
|
||||||
rx_tags = re.compile(r'\%\{[A-Za-z_]+?\}')
|
rx_tags = re.compile(r'\%\{[A-Za-z_]+?\}')
|
||||||
|
|
||||||
@@ -53,16 +53,15 @@ def buildLabel(
|
|||||||
|
|
||||||
|
|
||||||
def sanitize(cursor: sqlite3.Cursor, query: str) -> str:
|
def sanitize(cursor: sqlite3.Cursor, query: str) -> str:
|
||||||
cols, table = rx_query.findall(query)[0]
|
cols, table, joined = rx_query.findall(query)[0]
|
||||||
sel_cols = {x for x in rx_cols.findall(cols)}
|
sel_cols = {x for x in rx_cols.findall(cols)}
|
||||||
all_cols = {x[1] for x in cursor.execute(f'PRAGMA table_info({table});')}
|
all_cols = {x[1] for x in cursor.execute(f'PRAGMA table_info({table});')}
|
||||||
|
if joined:
|
||||||
|
all_cols |= {x[1] for x in cursor.execute(f'PRAGMA table_info({joined});')}
|
||||||
missing_cols = sel_cols.difference(all_cols)
|
missing_cols = sel_cols.difference(all_cols)
|
||||||
for missing in missing_cols:
|
for missing in missing_cols:
|
||||||
if missing == 'ZSERVICENAME':
|
print(f'[WARN] Column "{missing}" not found in {table}. Ignoring.',
|
||||||
pass # ignore irrelevant fields
|
file=sys.stderr)
|
||||||
else:
|
|
||||||
print(f'[WARN] Column "{missing}" not found in {table}. Ignoring.',
|
|
||||||
file=sys.stderr)
|
|
||||||
query = query.replace(missing, 'NULL')
|
query = query.replace(missing, 'NULL')
|
||||||
return query
|
return query
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user