221 lines
9.3 KiB
Python
Executable File
221 lines
9.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: UTF-8 -*-
|
|
import os
|
|
from InterruptSearch import InterruptSearch
|
|
from InterruptIndices import InterruptIndices
|
|
from Probability import Probability
|
|
from RuneText import RuneTextFile
|
|
from LPath import FILES_ALL, FILES_UNSOLVED, LPath
|
|
from KeySearch import GuessPattern
|
|
|
|
|
|
#########################################
|
|
# InterruptDB : Perform heuristic search on best possible interrupts.
|
|
#########################################
|
|
|
|
class InterruptDB(object):
|
|
def __init__(self, data, interrupt, irp_stops=None):
|
|
self.irp = interrupt
|
|
self.iguess = InterruptSearch(data, irp=interrupt, irp_stops=irp_stops)
|
|
self.irp_count = len(self.iguess.stops)
|
|
|
|
def find_best_solution(self, fn_score, keylen):
|
|
if keylen == 0: # without interrupts
|
|
score, skips = fn_score(self.iguess.join(), 1), [[]]
|
|
else:
|
|
score, skips = self.iguess.all(keylen, fn_score)
|
|
for i, interrupts in enumerate(skips):
|
|
skips[i] = self.iguess.to_occurrence_index(interrupts)
|
|
return score, skips
|
|
|
|
def write(self, dbname, desc, score, keylen, nums):
|
|
with open(LPath.db(dbname), 'a') as f:
|
|
for solution in nums:
|
|
solution = ','.join(map(str, solution))
|
|
f.write('{}|{}|{:.5f}|{}|{}|{}\n'.format(
|
|
desc, self.irp_count, score, self.irp, keylen, solution))
|
|
|
|
@staticmethod
|
|
def load(dbname):
|
|
if not os.path.isfile(LPath.db(dbname)):
|
|
return {}
|
|
ret = {}
|
|
with open(LPath.db(dbname), 'r') as f:
|
|
for line in f.readlines():
|
|
if line.startswith('#'):
|
|
continue
|
|
line = line.rstrip()
|
|
name, irpc, score, irp, kl, nums = [x for x in line.split('|')]
|
|
val = [int(irpc), float(score), int(irp), int(kl)]
|
|
val.append([int(x) for x in nums.split(',')] if nums else [])
|
|
try:
|
|
ret[name].append(val)
|
|
except KeyError:
|
|
ret[name] = [val]
|
|
return ret
|
|
|
|
@staticmethod
|
|
def load_scores(dbname):
|
|
scores = {} # {fname: [irp0_[kl0, kl1, ...], irp1_[...]]}
|
|
for name, entries in InterruptDB.load(dbname).items():
|
|
for irpc, score, irp, kl, nums in entries:
|
|
if name not in scores:
|
|
scores[name] = [[] for _ in range(29)]
|
|
part = scores[name][irp]
|
|
while kl >= len(part):
|
|
part.append((0, 0)) # (score, irp_count)
|
|
oldc = part[kl][1]
|
|
if irpc > oldc or (irpc == oldc and score > part[kl][0]):
|
|
part[kl] = (score, irpc)
|
|
return scores
|
|
|
|
|
|
#########################################
|
|
# helper functions
|
|
#########################################
|
|
|
|
|
|
def get_db(fname, irp, max_irp):
|
|
stops, Z = InterruptIndices().consider(fname, irp, max_irp)
|
|
data = RuneTextFile(LPath.page(fname)).index_no_white[:Z]
|
|
return InterruptDB(data, irp, irp_stops=stops)
|
|
|
|
|
|
def enum_db_irps(dbname, fn_score, max_irp=20, irpset=[0, 28],
|
|
klset=range(1, 33), files=FILES_UNSOLVED, fn_load_db=get_db):
|
|
oldValues = {k: set((a, b, c) for a, _, b, c, _ in v)
|
|
for k, v in InterruptDB.load(dbname).items()}
|
|
for irp in irpset: # interrupt rune index
|
|
for fname in files:
|
|
db = fn_load_db(fname, irp, max_irp)
|
|
print('load:', fname, 'interrupt:', irp, 'count:', db.irp_count)
|
|
for keylen in klset: # key length
|
|
if (db.irp_count, irp, keylen) in oldValues.get(fname, []):
|
|
print(f'{keylen}: skipped.')
|
|
continue
|
|
score, skips = db.find_best_solution(fn_score, keylen)
|
|
yield db, fname, score, keylen, skips
|
|
|
|
|
|
def create_primary(dbname, fn_score):
|
|
for db, fname, score, kl, skips in enum_db_irps(dbname, fn_score,
|
|
irpset=range(29),
|
|
files=FILES_ALL):
|
|
db.write(dbname, fname, score, kl, skips)
|
|
print(f'{kl}: {score:.4f}, solutions: {len(skips)}')
|
|
|
|
|
|
def create_secondary(db_in, db_out, fn_score, threshold=0.75, max_irp=20):
|
|
search_set = set()
|
|
for fname, arr in InterruptDB.load(db_in).items():
|
|
if fname in FILES_UNSOLVED:
|
|
for irpc, score, irp, kl, nums in arr:
|
|
if score > threshold and kl > 3 and kl < 26:
|
|
search_set.add((fname, irp, kl))
|
|
print('searching through', len(search_set), 'candidates.')
|
|
for fname, irp, kl in search_set:
|
|
print('load:', fname, 'interrupt:', irp, 'keylen:', kl)
|
|
scores = []
|
|
|
|
def fn_keep_scores(x, kl):
|
|
score = fn_score(x, kl)
|
|
if score >= threshold:
|
|
scores.append(score) # hacky but gets the job done
|
|
return 1
|
|
return -1
|
|
|
|
db = get_db(fname, irp, max_irp)
|
|
_, skips = db.find_best_solution(fn_keep_scores, kl)
|
|
ret = list(zip(scores, skips))
|
|
bestscore = max(ret)[0]
|
|
# exclude best results, as they are already present in the main db
|
|
filtered = [x for x in ret if x[0] < bestscore]
|
|
for score, nums in filtered:
|
|
db.write(db_out, fname, score, kl, [nums])
|
|
print('found', len(filtered), 'additional solutions')
|
|
|
|
|
|
def create_mod_a_db(dbprefix, fn_score):
|
|
for mod, upto in [(2, 13), (3, 8)]:
|
|
for mo in range(mod):
|
|
# if needed add combined check for all modulo parts
|
|
def xor_split(data, keylen):
|
|
return fn_score(data[mo::mod], keylen)
|
|
|
|
dbname = f'db_{dbprefix}_mod_a_{mod}.{mo}'
|
|
for db, fname, score, kl, skips in enum_db_irps(
|
|
dbname, xor_split, klset=range(1, upto + 1)):
|
|
db.write(dbname, fname, score, kl, skips)
|
|
print(f'mod a {mod}.{mo}, kl: {kl}, score: {score:.4f}')
|
|
|
|
|
|
def create_mod_b_db(dbprefix, fn_score):
|
|
db_i = InterruptIndices()
|
|
for mod, upto in [(2, 18), (3, 18)]:
|
|
for mo in range(mod):
|
|
# custom modulo data load function
|
|
def db_load_mod(fname, irp, max_irp):
|
|
stops, Z = db_i.consider_mod_b(fname, irp, max_irp, mod)
|
|
stops = stops[mo]
|
|
Z = Z[mo]
|
|
data = RuneTextFile(LPath.page(fname)).index_no_white
|
|
data = data[mo::mod][:Z]
|
|
return InterruptDB(data, irp, irp_stops=stops)
|
|
|
|
dbname = f'db_{dbprefix}_mod_b_{mod}.{mo}'
|
|
for db, fname, score, kl, skips in enum_db_irps(
|
|
dbname, fn_score, klset=range(2, upto + 1),
|
|
fn_load_db=db_load_mod):
|
|
db.write(dbname, fname, score, kl, skips)
|
|
print(f'mod b {mod}.{mo}, kl: {kl}, score: {score:.4f}')
|
|
|
|
|
|
def create_pattern_shift_db(dbprefix, fn_score, offset=0):
|
|
# we misuse the db's keylen column as pattern shift multiply
|
|
for kpl in range(4, 19): # key pattern length, equiv. to x^2 vigenere
|
|
def fn_pattern_scr(x, kpl_shift):
|
|
gen = GuessPattern.shift_pattern(kpl, kpl_shift)
|
|
parts = GuessPattern.groups(x, kpl, gen, offset)
|
|
return fn_score(parts, kpl)
|
|
|
|
dbname = f'db_{dbprefix}_pattern_shift_{kpl}.{offset}'
|
|
for db, fname, score, kl, skips in enum_db_irps(dbname, fn_pattern_scr,
|
|
irpset=[0, 28],
|
|
klset=range(1, kpl)):
|
|
db.write(dbname, fname, score, kl, skips)
|
|
print(f'shift_pattern {kpl}.{offset}'
|
|
f', shift: {kl}, score: {score:.4f}')
|
|
|
|
|
|
def create_pattern_mirror_db(dbprefix, fn_score, offset=0):
|
|
for typ, generator in [('a', GuessPattern.mirror_pattern_a),
|
|
('b', GuessPattern.mirror_pattern_b)]:
|
|
def fn_mirror_scr(x, kl):
|
|
parts = GuessPattern.groups(x, kl, generator(kl), offset)
|
|
return fn_score(parts, kl)
|
|
|
|
dbname = f'db_{dbprefix}_pattern_mirror_{typ}.{offset}'
|
|
for db, fname, score, kl, skips in enum_db_irps(dbname, fn_mirror_scr,
|
|
irpset=[0, 28],
|
|
klset=range(4, 19)):
|
|
db.write(dbname, fname, score, kl, skips)
|
|
print(f'mirror_pattern {typ}.{offset}'
|
|
f', kl: {kl}, score: {score:.4f}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# create_primary('db_high', Probability.IC_w_keylen)
|
|
# create_primary('db_norm', Probability.target_diff)
|
|
# create_mod_a_db('high', Probability.IC_w_keylen)
|
|
# create_mod_a_db('norm', Probability.target_diff)
|
|
# create_mod_b_db('high', Probability.IC_w_keylen)
|
|
# create_mod_b_db('norm', Probability.target_diff)
|
|
# create_pattern_shift_db('high', Probability.parts_high, offset=0)
|
|
# create_pattern_shift_db('norm', Probability.parts_norm, offset=0)
|
|
create_pattern_mirror_db('high', Probability.parts_high, offset=0)
|
|
create_pattern_mirror_db('norm', Probability.parts_norm, offset=0)
|
|
# create_secondary('db_high', 'db_high_secondary',
|
|
# Probability.IC_w_keylen, threshold=1.4)
|
|
# create_secondary('db_norm', 'db_norm_secondary',
|
|
# Probability.target_diff, threshold=0.55)
|