From a3255997ac045dbf5f053e10174643bc50453fd2 Mon Sep 17 00:00:00 2001 From: relikd Date: Wed, 17 Feb 2021 18:57:53 +0100 Subject: [PATCH] IoC for patterns --- LP/InterruptDB.py | 194 ++++++++++++++++++++++++---------------------- LP/KeySearch.py | 51 +++++++----- probability.py | 22 ++---- 3 files changed, 141 insertions(+), 126 deletions(-) diff --git a/LP/InterruptDB.py b/LP/InterruptDB.py index fd93fce..6a1c89d 100755 --- a/LP/InterruptDB.py +++ b/LP/InterruptDB.py @@ -6,6 +6,7 @@ from InterruptIndices import InterruptIndices from Probability import Probability from RuneText import RuneTextFile from LPath import FILES_ALL, FILES_UNSOLVED, LPath +from KeySearch import GuessPattern ######################################### @@ -18,40 +19,21 @@ class InterruptDB(object): self.iguess = InterruptSearch(data, irp=interrupt, irp_stops=irp_stops) self.irp_count = len(self.iguess.stops) - def make(self, dbname, name, keylen, fn_score): + def find_best_solution(self, fn_score, keylen): if keylen == 0: # without interrupts score, skips = fn_score(self.iguess.join(), 1), [[]] else: score, skips = self.iguess.all(keylen, fn_score) for i, interrupts in enumerate(skips): skips[i] = self.iguess.to_occurrence_index(interrupts) - - for nums in skips: - self.write( - name, score, self.irp, self.irp_count, keylen, nums, dbname) return score, skips - def make_secondary(self, dbname, name, keylen, fn_score, threshold): - scores = [] - - def fn(x, kl): - score = fn_score(x, kl) - if score >= threshold: - scores.append(score) - return 1 - return -1 - - _, skips = self.iguess.all(keylen, fn) - for i, interrupts in enumerate(skips): - skips[i] = self.iguess.to_occurrence_index(interrupts) - ret = list(zip(scores, skips)) - bestscore = max(ret)[0] - # exclude best results, as they are already present in the main db - filtered = [x for x in ret if x[0] < bestscore] - for score, nums in filtered: - self.write( - name, score, self.irp, self.irp_count, keylen, nums, dbname) - return len(filtered) + def write(self, dbname, desc, score, keylen, nums): + with open(LPath.db(dbname), 'a') as f: + for solution in nums: + solution = ','.join(map(str, solution)) + f.write('{}|{}|{:.5f}|{}|{}|{}\n'.format( + desc, self.irp_count, score, self.irp, keylen, solution)) @staticmethod def load(dbname): @@ -75,11 +57,11 @@ class InterruptDB(object): @staticmethod def load_scores(dbname): scores = {} # {fname: [irp0_[kl0, kl1, ...], irp1_[...]]} - for k, v in InterruptDB.load(dbname).items(): - for irpc, score, irp, kl, nums in v: - if k not in scores: - scores[k] = [[] for _ in range(29)] - part = scores[k][irp] + for name, entries in InterruptDB.load(dbname).items(): + for irpc, score, irp, kl, nums in entries: + if name not in scores: + scores[name] = [[] for _ in range(29)] + part = scores[name][irp] while kl >= len(part): part.append((0, 0)) # (score, irp_count) oldc = part[kl][1] @@ -87,101 +69,131 @@ class InterruptDB(object): part[kl] = (score, irpc) return scores - @staticmethod - def write(name, score, irp, irpmax, keylen, nums, dbname='db_main'): - with open(LPath.db(dbname), 'a') as f: - nums = ','.join(map(str, nums)) - f.write(f'{name}|{irpmax}|{score:.5f}|{irp}|{keylen}|{nums}\n') - ######################################### # helper functions ######################################### + def get_db(fname, irp, max_irp): stops, Z = InterruptIndices().consider(fname, irp, max_irp) data = RuneTextFile(LPath.page(fname)).index_no_white[:Z] return InterruptDB(data, irp, irp_stops=stops) -def create_primary(dbname, fn_score, klset=range(1, 33), - max_irp=20, irpset=range(29), files=FILES_ALL): - oldDB = InterruptDB.load(dbname) +def enum_db_irps(dbname, fn_score, max_irp=20, irpset=[0, 28], + klset=range(1, 33), files=FILES_UNSOLVED, fn_load_db=get_db): oldValues = {k: set((a, b, c) for a, _, b, c, _ in v) - for k, v in oldDB.items()} + for k, v in InterruptDB.load(dbname).items()} for irp in irpset: # interrupt rune index - for name in files: - db = get_db(name, irp, max_irp) - print('load:', name, 'interrupt:', irp, 'count:', db.irp_count) + for fname in files: + db = fn_load_db(fname, irp, max_irp) + print('load:', fname, 'interrupt:', irp, 'count:', db.irp_count) for keylen in klset: # key length - if (db.irp_count, irp, keylen) in oldValues.get(name, []): + if (db.irp_count, irp, keylen) in oldValues.get(fname, []): print(f'{keylen}: skipped.') continue - score, interrupts = db.make(dbname, name, keylen, fn_score) - print(f'{keylen}: {score:.4f}, solutions: {len(interrupts)}') + score, skips = db.find_best_solution(fn_score, keylen) + yield db, fname, score, keylen, skips + + +def create_primary(dbname, fn_score): + for db, fname, score, kl, skips in enum_db_irps(dbname, fn_score, + irpset=range(29), + files=FILES_ALL): + db.write(dbname, fname, score, kl, skips) + print(f'{kl}: {score:.4f}, solutions: {len(skips)}') def create_secondary(db_in, db_out, fn_score, threshold=0.75, max_irp=20): - oldDB = InterruptDB.load(db_in) search_set = set() - for name, arr in oldDB.items(): - if name not in FILES_UNSOLVED: - continue - for irpc, score, irp, kl, nums in arr: - if score <= threshold or kl > 26 or kl < 3: - continue - search_set.add((name, irp, kl)) - print('searching through', len(search_set), 'files.') - for name, irp, kl in search_set: - print('load:', name, 'interrupt:', irp, 'keylen:', kl) - db = get_db(name, irp, max_irp) - c = db.make_secondary(db_out, name, kl, fn_score, threshold) - print('found', c, 'additional solutions') + for fname, arr in InterruptDB.load(db_in).items(): + if fname in FILES_UNSOLVED: + for irpc, score, irp, kl, nums in arr: + if score > threshold and kl > 3 and kl < 26: + search_set.add((fname, irp, kl)) + print('searching through', len(search_set), 'candidates.') + for fname, irp, kl in search_set: + print('load:', fname, 'interrupt:', irp, 'keylen:', kl) + scores = [] + + def fn_keep_scores(x, kl): + score = fn_score(x, kl) + if score >= threshold: + scores.append(score) # hacky but gets the job done + return 1 + return -1 + + db = get_db(fname, irp, max_irp) + _, skips = db.find_best_solution(fn_keep_scores, kl) + ret = list(zip(scores, skips)) + bestscore = max(ret)[0] + # exclude best results, as they are already present in the main db + filtered = [x for x in ret if x[0] < bestscore] + for score, nums in filtered: + db.write(db_out, fname, score, kl, [nums]) + print('found', len(filtered), 'additional solutions') -def create_mod_a_db(dbprefix, fn_score, klpairs, max_irp=20, irpset=[0, 28]): - for mod, upto in klpairs: +def create_mod_a_db(dbprefix, fn_score): + for mod, upto in [(2, 13), (3, 8)]: for mo in range(mod): # if needed add combined check for all modulo parts def xor_split(data, keylen): return fn_score(data[mo::mod], keylen) - create_primary(f'db_{dbprefix}_mod_a_{mod}.{mo}', xor_split, - range(1, upto + 1), max_irp, irpset, FILES_UNSOLVED) + dbname = f'db_{dbprefix}_mod_a_{mod}.{mo}' + for db, fname, score, kl, skips in enum_db_irps( + dbname, xor_split, klset=range(1, upto + 1)): + db.write(dbname, fname, score, kl, skips) + print(f'mod a {mod}.{mo}, kl: {kl}, score: {score:.4f}') -def create_mod_b_db(dbprefix, fn_score, klpairs, max_irp=20, irpset=[0, 28]): +def create_mod_b_db(dbprefix, fn_score): db_i = InterruptIndices() - for mod, upto in klpairs: + for mod, upto in [(2, 18), (3, 18)]: for mo in range(mod): - dbname = f'db_{dbprefix}_mod_b_{mod}.{mo}' - oldDB = {k: set((a, b, c) for a, _, b, c, _ in v) - for k, v in InterruptDB.load(dbname).items()} + # custom modulo data load function + def db_load_mod(fname, irp, max_irp): + stops, Z = db_i.consider_mod_b(fname, irp, max_irp, mod) + stops = stops[mo] + Z = Z[mo] + data = RuneTextFile(LPath.page(fname)).index_no_white + data = data[mo::mod][:Z] + return InterruptDB(data, irp, irp_stops=stops) - for irp in irpset: # interrupt rune index - for name in FILES_UNSOLVED: - stops, Z = db_i.consider_mod_b(name, irp, max_irp, mod) - stops = stops[mo] - Z = Z[mo] - data = RuneTextFile(LPath.page(name)).index_no_white - data = data[mo::mod][:Z] - db = InterruptDB(data, irp, irp_stops=stops) - print(f'load: {name} interrupt: {irp} count: {len(stops)}') - for keylen in range(2, upto + 1): # key length - if (db.irp_count, irp, keylen) in oldDB.get(name, []): - print(f'{keylen}: skipped.') - continue - score, irps = db.make(dbname, name, keylen, fn_score) - print(f'{keylen}: {score:.4f}, solutions: {len(irps)}') + dbname = f'db_{dbprefix}_mod_b_{mod}.{mo}' + for db, fname, score, kl, skips in enum_db_irps( + dbname, fn_score, klset=range(2, upto + 1), + fn_load_db=db_load_mod): + db.write(dbname, fname, score, kl, skips) + print(f'mod b {mod}.{mo}, kl: {kl}, score: {score:.4f}') + + +def create_pattern_shift_db(offset=0): + # we misuse the db's keylen column as pattern shift multiply + for kpl in range(4, 19): # key pattern length, equiv. to x^2 vigenere + def fn_score(x, kpl_shift): + parts = GuessPattern.groups(x, kpl, kpl_shift, offset) + return sum(Probability(x).IC() for x in parts) / kpl + # return 1 - (sum(Probability(x).IC_norm() for x in parts) / kl) + + dbname = f'db_high_pattern_shift_{kpl}.{offset}' + for db, fname, score, kl, skips in enum_db_irps(dbname, fn_score, + irpset=[0], + klset=range(1, kpl)): + db.write(dbname, fname, score, kl, skips) + print(f'shift_pattern {kpl}.{offset}, shift: {kl}, score: {score:.4f}') if __name__ == '__main__': - create_primary('db_high', Probability.IC_w_keylen, max_irp=20) - create_primary('db_norm', Probability.target_diff, max_irp=20) - create_mod_a_db('high', Probability.IC_w_keylen, [(2, 13), (3, 8)]) - create_mod_a_db('norm', Probability.target_diff, [(2, 13), (3, 8)]) - create_mod_b_db('high', Probability.IC_w_keylen, [(2, 18), (3, 18)]) - create_mod_b_db('norm', Probability.target_diff, [(2, 18), (3, 18)]) + # create_primary('db_high', Probability.IC_w_keylen) + # create_primary('db_norm', Probability.target_diff) + # create_mod_a_db('high', Probability.IC_w_keylen) + # create_mod_a_db('norm', Probability.target_diff) + # create_mod_b_db('high', Probability.IC_w_keylen) + # create_mod_b_db('norm', Probability.target_diff) + create_pattern_shift_db(offset=0) # create_secondary('db_high', 'db_high_secondary', # Probability.IC_w_keylen, threshold=1.4) # create_secondary('db_norm', 'db_norm_secondary', diff --git a/LP/KeySearch.py b/LP/KeySearch.py index 8269b0a..afbe7ad 100755 --- a/LP/KeySearch.py +++ b/LP/KeySearch.py @@ -60,27 +60,38 @@ class GuessAffine(object): ######################################### class GuessPattern(object): - def __init__(self, nums): - self.nums = nums + @staticmethod + def groups(nums, keylen, shift=1, offset=0): + gen = GuessPattern.shift_pattern(keylen, shift) + for _ in range(offset): + next(gen) + ret = [[] for _ in range(keylen)] + for idx, value in zip(gen, nums): + ret[idx].append(value) + return ret + + def shift_pattern(kl, shift=1): # shift by (more than) one, 012201120 + for i in range(10000): + p = (i * shift) % kl + yield from range(p, kl) + yield from range(p) + + def mirror_pattern_a(kl): # mirrored, 012210012210 + for i in range(10000): + yield from range(kl) + yield from range(kl - 1, -1, -1) + + def mirror_pattern_b(kl): # mirrored, 012101210 + for i in range(10000): + yield from range(kl) + yield from range(kl - 2, 0, -1) @staticmethod - def pattern(keylen, fn_pattern): - mask = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'[:keylen] - return fn_pattern(mask, keylen) - - def split(self, keylen, mask, offset=0): - ret = {} + def zip(nums, key, keylen, shift=1, offset=0): + gen = GuessPattern.shift_pattern(keylen, shift) for _ in range(offset): - next(mask) - ret = {k: [] for k in '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'[:keylen]} - for n, k in zip(self.nums, mask): - ret[k].append(n) - return ret.values() - - def zip(self, key_mask, offset=0): - for _ in range(offset): - next(key_mask) - return [(n - k) % 29 for n, k in zip(self.nums, key_mask)] + next(gen) + return [(n - key[k]) % 29 for n, k in zip(nums, gen)] @staticmethod def guess(parts, score_fn): # minimize score_fn @@ -97,3 +108,7 @@ class GuessPattern(object): avg_score += best found.append(candidate) return avg_score / len(parts), found + + +if __name__ == '__main__': + print(list(GuessPattern.shift_pattern(4, 3))[:20]) diff --git a/probability.py b/probability.py index cbb737b..568bb07 100755 --- a/probability.py +++ b/probability.py @@ -84,27 +84,15 @@ def pattern_solver(fname, irp=0): def fn_similarity(x): return LP.Probability(x).similarity() - def fn_pattern_mirror(x, kl): - for i in range(10000): # mirrored, 012210012210 or 012101210 - yield from x - # yield from x[::-1] - yield from x[::-1][1:-1] - prnt_fmt = 'kl: {}, pattern-n: {}, IoC: {:.3f}, dist: {:.4f}, offset: {}, key: {}' print(fname) - gr = LP.GuessPattern(data) + # gr = LP.GuessPattern(data) for kl in range(3, 19): - for pattern_shift in range(1, kl): - def fn_pattern_shift(x, kl): # shift by (more than) one, 012201120 - for i in range(10000): - yield from x[(i * pattern_shift) % kl:] - yield from x[:(i * pattern_shift) % kl] - + for kl_shift in range(1, kl): # Find proper pattern res = [] for offset in range(kl): # up to keylen offset - mask = LP.GuessPattern.pattern(kl, fn_pattern_shift) - parts = gr.split(kl, mask, offset) + parts = LP.GuessPattern.groups(data, kl, kl_shift, offset) score = sum(LP.Probability(x).IC() for x in parts) / kl if score > 1.6 and score < 2.1: res.append((score, parts, offset)) @@ -113,9 +101,9 @@ def pattern_solver(fname, irp=0): for score, parts, off in res: sc, key = LP.GuessPattern.guess(parts, fn_similarity) if sc < 0.1: - print(prnt_fmt.format(kl, pattern_shift, score, sc, off, + print(prnt_fmt.format(kl, kl_shift, score, sc, off, LP.RuneText(key).text)) - solved = gr.zip(fn_pattern_shift(key, kl), off) + solved = LP.GuessPattern.zip(data, key, kl, kl_shift, off) for i in whitespace_i: solved.insert(i, 29) print(' ', LP.RuneText(solved).text)