IoC for patterns
This commit is contained in:
@@ -6,6 +6,7 @@ from InterruptIndices import InterruptIndices
|
|||||||
from Probability import Probability
|
from Probability import Probability
|
||||||
from RuneText import RuneTextFile
|
from RuneText import RuneTextFile
|
||||||
from LPath import FILES_ALL, FILES_UNSOLVED, LPath
|
from LPath import FILES_ALL, FILES_UNSOLVED, LPath
|
||||||
|
from KeySearch import GuessPattern
|
||||||
|
|
||||||
|
|
||||||
#########################################
|
#########################################
|
||||||
@@ -18,40 +19,21 @@ class InterruptDB(object):
|
|||||||
self.iguess = InterruptSearch(data, irp=interrupt, irp_stops=irp_stops)
|
self.iguess = InterruptSearch(data, irp=interrupt, irp_stops=irp_stops)
|
||||||
self.irp_count = len(self.iguess.stops)
|
self.irp_count = len(self.iguess.stops)
|
||||||
|
|
||||||
def make(self, dbname, name, keylen, fn_score):
|
def find_best_solution(self, fn_score, keylen):
|
||||||
if keylen == 0: # without interrupts
|
if keylen == 0: # without interrupts
|
||||||
score, skips = fn_score(self.iguess.join(), 1), [[]]
|
score, skips = fn_score(self.iguess.join(), 1), [[]]
|
||||||
else:
|
else:
|
||||||
score, skips = self.iguess.all(keylen, fn_score)
|
score, skips = self.iguess.all(keylen, fn_score)
|
||||||
for i, interrupts in enumerate(skips):
|
for i, interrupts in enumerate(skips):
|
||||||
skips[i] = self.iguess.to_occurrence_index(interrupts)
|
skips[i] = self.iguess.to_occurrence_index(interrupts)
|
||||||
|
|
||||||
for nums in skips:
|
|
||||||
self.write(
|
|
||||||
name, score, self.irp, self.irp_count, keylen, nums, dbname)
|
|
||||||
return score, skips
|
return score, skips
|
||||||
|
|
||||||
def make_secondary(self, dbname, name, keylen, fn_score, threshold):
|
def write(self, dbname, desc, score, keylen, nums):
|
||||||
scores = []
|
with open(LPath.db(dbname), 'a') as f:
|
||||||
|
for solution in nums:
|
||||||
def fn(x, kl):
|
solution = ','.join(map(str, solution))
|
||||||
score = fn_score(x, kl)
|
f.write('{}|{}|{:.5f}|{}|{}|{}\n'.format(
|
||||||
if score >= threshold:
|
desc, self.irp_count, score, self.irp, keylen, solution))
|
||||||
scores.append(score)
|
|
||||||
return 1
|
|
||||||
return -1
|
|
||||||
|
|
||||||
_, skips = self.iguess.all(keylen, fn)
|
|
||||||
for i, interrupts in enumerate(skips):
|
|
||||||
skips[i] = self.iguess.to_occurrence_index(interrupts)
|
|
||||||
ret = list(zip(scores, skips))
|
|
||||||
bestscore = max(ret)[0]
|
|
||||||
# exclude best results, as they are already present in the main db
|
|
||||||
filtered = [x for x in ret if x[0] < bestscore]
|
|
||||||
for score, nums in filtered:
|
|
||||||
self.write(
|
|
||||||
name, score, self.irp, self.irp_count, keylen, nums, dbname)
|
|
||||||
return len(filtered)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def load(dbname):
|
def load(dbname):
|
||||||
@@ -75,11 +57,11 @@ class InterruptDB(object):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def load_scores(dbname):
|
def load_scores(dbname):
|
||||||
scores = {} # {fname: [irp0_[kl0, kl1, ...], irp1_[...]]}
|
scores = {} # {fname: [irp0_[kl0, kl1, ...], irp1_[...]]}
|
||||||
for k, v in InterruptDB.load(dbname).items():
|
for name, entries in InterruptDB.load(dbname).items():
|
||||||
for irpc, score, irp, kl, nums in v:
|
for irpc, score, irp, kl, nums in entries:
|
||||||
if k not in scores:
|
if name not in scores:
|
||||||
scores[k] = [[] for _ in range(29)]
|
scores[name] = [[] for _ in range(29)]
|
||||||
part = scores[k][irp]
|
part = scores[name][irp]
|
||||||
while kl >= len(part):
|
while kl >= len(part):
|
||||||
part.append((0, 0)) # (score, irp_count)
|
part.append((0, 0)) # (score, irp_count)
|
||||||
oldc = part[kl][1]
|
oldc = part[kl][1]
|
||||||
@@ -87,101 +69,131 @@ class InterruptDB(object):
|
|||||||
part[kl] = (score, irpc)
|
part[kl] = (score, irpc)
|
||||||
return scores
|
return scores
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def write(name, score, irp, irpmax, keylen, nums, dbname='db_main'):
|
|
||||||
with open(LPath.db(dbname), 'a') as f:
|
|
||||||
nums = ','.join(map(str, nums))
|
|
||||||
f.write(f'{name}|{irpmax}|{score:.5f}|{irp}|{keylen}|{nums}\n')
|
|
||||||
|
|
||||||
|
|
||||||
#########################################
|
#########################################
|
||||||
# helper functions
|
# helper functions
|
||||||
#########################################
|
#########################################
|
||||||
|
|
||||||
|
|
||||||
def get_db(fname, irp, max_irp):
|
def get_db(fname, irp, max_irp):
|
||||||
stops, Z = InterruptIndices().consider(fname, irp, max_irp)
|
stops, Z = InterruptIndices().consider(fname, irp, max_irp)
|
||||||
data = RuneTextFile(LPath.page(fname)).index_no_white[:Z]
|
data = RuneTextFile(LPath.page(fname)).index_no_white[:Z]
|
||||||
return InterruptDB(data, irp, irp_stops=stops)
|
return InterruptDB(data, irp, irp_stops=stops)
|
||||||
|
|
||||||
|
|
||||||
def create_primary(dbname, fn_score, klset=range(1, 33),
|
def enum_db_irps(dbname, fn_score, max_irp=20, irpset=[0, 28],
|
||||||
max_irp=20, irpset=range(29), files=FILES_ALL):
|
klset=range(1, 33), files=FILES_UNSOLVED, fn_load_db=get_db):
|
||||||
oldDB = InterruptDB.load(dbname)
|
|
||||||
oldValues = {k: set((a, b, c) for a, _, b, c, _ in v)
|
oldValues = {k: set((a, b, c) for a, _, b, c, _ in v)
|
||||||
for k, v in oldDB.items()}
|
for k, v in InterruptDB.load(dbname).items()}
|
||||||
for irp in irpset: # interrupt rune index
|
for irp in irpset: # interrupt rune index
|
||||||
for name in files:
|
for fname in files:
|
||||||
db = get_db(name, irp, max_irp)
|
db = fn_load_db(fname, irp, max_irp)
|
||||||
print('load:', name, 'interrupt:', irp, 'count:', db.irp_count)
|
print('load:', fname, 'interrupt:', irp, 'count:', db.irp_count)
|
||||||
for keylen in klset: # key length
|
for keylen in klset: # key length
|
||||||
if (db.irp_count, irp, keylen) in oldValues.get(name, []):
|
if (db.irp_count, irp, keylen) in oldValues.get(fname, []):
|
||||||
print(f'{keylen}: skipped.')
|
print(f'{keylen}: skipped.')
|
||||||
continue
|
continue
|
||||||
score, interrupts = db.make(dbname, name, keylen, fn_score)
|
score, skips = db.find_best_solution(fn_score, keylen)
|
||||||
print(f'{keylen}: {score:.4f}, solutions: {len(interrupts)}')
|
yield db, fname, score, keylen, skips
|
||||||
|
|
||||||
|
|
||||||
|
def create_primary(dbname, fn_score):
|
||||||
|
for db, fname, score, kl, skips in enum_db_irps(dbname, fn_score,
|
||||||
|
irpset=range(29),
|
||||||
|
files=FILES_ALL):
|
||||||
|
db.write(dbname, fname, score, kl, skips)
|
||||||
|
print(f'{kl}: {score:.4f}, solutions: {len(skips)}')
|
||||||
|
|
||||||
|
|
||||||
def create_secondary(db_in, db_out, fn_score, threshold=0.75, max_irp=20):
|
def create_secondary(db_in, db_out, fn_score, threshold=0.75, max_irp=20):
|
||||||
oldDB = InterruptDB.load(db_in)
|
|
||||||
search_set = set()
|
search_set = set()
|
||||||
for name, arr in oldDB.items():
|
for fname, arr in InterruptDB.load(db_in).items():
|
||||||
if name not in FILES_UNSOLVED:
|
if fname in FILES_UNSOLVED:
|
||||||
continue
|
for irpc, score, irp, kl, nums in arr:
|
||||||
for irpc, score, irp, kl, nums in arr:
|
if score > threshold and kl > 3 and kl < 26:
|
||||||
if score <= threshold or kl > 26 or kl < 3:
|
search_set.add((fname, irp, kl))
|
||||||
continue
|
print('searching through', len(search_set), 'candidates.')
|
||||||
search_set.add((name, irp, kl))
|
for fname, irp, kl in search_set:
|
||||||
print('searching through', len(search_set), 'files.')
|
print('load:', fname, 'interrupt:', irp, 'keylen:', kl)
|
||||||
for name, irp, kl in search_set:
|
scores = []
|
||||||
print('load:', name, 'interrupt:', irp, 'keylen:', kl)
|
|
||||||
db = get_db(name, irp, max_irp)
|
def fn_keep_scores(x, kl):
|
||||||
c = db.make_secondary(db_out, name, kl, fn_score, threshold)
|
score = fn_score(x, kl)
|
||||||
print('found', c, 'additional solutions')
|
if score >= threshold:
|
||||||
|
scores.append(score) # hacky but gets the job done
|
||||||
|
return 1
|
||||||
|
return -1
|
||||||
|
|
||||||
|
db = get_db(fname, irp, max_irp)
|
||||||
|
_, skips = db.find_best_solution(fn_keep_scores, kl)
|
||||||
|
ret = list(zip(scores, skips))
|
||||||
|
bestscore = max(ret)[0]
|
||||||
|
# exclude best results, as they are already present in the main db
|
||||||
|
filtered = [x for x in ret if x[0] < bestscore]
|
||||||
|
for score, nums in filtered:
|
||||||
|
db.write(db_out, fname, score, kl, [nums])
|
||||||
|
print('found', len(filtered), 'additional solutions')
|
||||||
|
|
||||||
|
|
||||||
def create_mod_a_db(dbprefix, fn_score, klpairs, max_irp=20, irpset=[0, 28]):
|
def create_mod_a_db(dbprefix, fn_score):
|
||||||
for mod, upto in klpairs:
|
for mod, upto in [(2, 13), (3, 8)]:
|
||||||
for mo in range(mod):
|
for mo in range(mod):
|
||||||
# if needed add combined check for all modulo parts
|
# if needed add combined check for all modulo parts
|
||||||
def xor_split(data, keylen):
|
def xor_split(data, keylen):
|
||||||
return fn_score(data[mo::mod], keylen)
|
return fn_score(data[mo::mod], keylen)
|
||||||
|
|
||||||
create_primary(f'db_{dbprefix}_mod_a_{mod}.{mo}', xor_split,
|
dbname = f'db_{dbprefix}_mod_a_{mod}.{mo}'
|
||||||
range(1, upto + 1), max_irp, irpset, FILES_UNSOLVED)
|
for db, fname, score, kl, skips in enum_db_irps(
|
||||||
|
dbname, xor_split, klset=range(1, upto + 1)):
|
||||||
|
db.write(dbname, fname, score, kl, skips)
|
||||||
|
print(f'mod a {mod}.{mo}, kl: {kl}, score: {score:.4f}')
|
||||||
|
|
||||||
|
|
||||||
def create_mod_b_db(dbprefix, fn_score, klpairs, max_irp=20, irpset=[0, 28]):
|
def create_mod_b_db(dbprefix, fn_score):
|
||||||
db_i = InterruptIndices()
|
db_i = InterruptIndices()
|
||||||
for mod, upto in klpairs:
|
for mod, upto in [(2, 18), (3, 18)]:
|
||||||
for mo in range(mod):
|
for mo in range(mod):
|
||||||
dbname = f'db_{dbprefix}_mod_b_{mod}.{mo}'
|
# custom modulo data load function
|
||||||
oldDB = {k: set((a, b, c) for a, _, b, c, _ in v)
|
def db_load_mod(fname, irp, max_irp):
|
||||||
for k, v in InterruptDB.load(dbname).items()}
|
stops, Z = db_i.consider_mod_b(fname, irp, max_irp, mod)
|
||||||
|
stops = stops[mo]
|
||||||
|
Z = Z[mo]
|
||||||
|
data = RuneTextFile(LPath.page(fname)).index_no_white
|
||||||
|
data = data[mo::mod][:Z]
|
||||||
|
return InterruptDB(data, irp, irp_stops=stops)
|
||||||
|
|
||||||
for irp in irpset: # interrupt rune index
|
dbname = f'db_{dbprefix}_mod_b_{mod}.{mo}'
|
||||||
for name in FILES_UNSOLVED:
|
for db, fname, score, kl, skips in enum_db_irps(
|
||||||
stops, Z = db_i.consider_mod_b(name, irp, max_irp, mod)
|
dbname, fn_score, klset=range(2, upto + 1),
|
||||||
stops = stops[mo]
|
fn_load_db=db_load_mod):
|
||||||
Z = Z[mo]
|
db.write(dbname, fname, score, kl, skips)
|
||||||
data = RuneTextFile(LPath.page(name)).index_no_white
|
print(f'mod b {mod}.{mo}, kl: {kl}, score: {score:.4f}')
|
||||||
data = data[mo::mod][:Z]
|
|
||||||
db = InterruptDB(data, irp, irp_stops=stops)
|
|
||||||
print(f'load: {name} interrupt: {irp} count: {len(stops)}')
|
def create_pattern_shift_db(offset=0):
|
||||||
for keylen in range(2, upto + 1): # key length
|
# we misuse the db's keylen column as pattern shift multiply
|
||||||
if (db.irp_count, irp, keylen) in oldDB.get(name, []):
|
for kpl in range(4, 19): # key pattern length, equiv. to x^2 vigenere
|
||||||
print(f'{keylen}: skipped.')
|
def fn_score(x, kpl_shift):
|
||||||
continue
|
parts = GuessPattern.groups(x, kpl, kpl_shift, offset)
|
||||||
score, irps = db.make(dbname, name, keylen, fn_score)
|
return sum(Probability(x).IC() for x in parts) / kpl
|
||||||
print(f'{keylen}: {score:.4f}, solutions: {len(irps)}')
|
# return 1 - (sum(Probability(x).IC_norm() for x in parts) / kl)
|
||||||
|
|
||||||
|
dbname = f'db_high_pattern_shift_{kpl}.{offset}'
|
||||||
|
for db, fname, score, kl, skips in enum_db_irps(dbname, fn_score,
|
||||||
|
irpset=[0],
|
||||||
|
klset=range(1, kpl)):
|
||||||
|
db.write(dbname, fname, score, kl, skips)
|
||||||
|
print(f'shift_pattern {kpl}.{offset}, shift: {kl}, score: {score:.4f}')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
create_primary('db_high', Probability.IC_w_keylen, max_irp=20)
|
# create_primary('db_high', Probability.IC_w_keylen)
|
||||||
create_primary('db_norm', Probability.target_diff, max_irp=20)
|
# create_primary('db_norm', Probability.target_diff)
|
||||||
create_mod_a_db('high', Probability.IC_w_keylen, [(2, 13), (3, 8)])
|
# create_mod_a_db('high', Probability.IC_w_keylen)
|
||||||
create_mod_a_db('norm', Probability.target_diff, [(2, 13), (3, 8)])
|
# create_mod_a_db('norm', Probability.target_diff)
|
||||||
create_mod_b_db('high', Probability.IC_w_keylen, [(2, 18), (3, 18)])
|
# create_mod_b_db('high', Probability.IC_w_keylen)
|
||||||
create_mod_b_db('norm', Probability.target_diff, [(2, 18), (3, 18)])
|
# create_mod_b_db('norm', Probability.target_diff)
|
||||||
|
create_pattern_shift_db(offset=0)
|
||||||
# create_secondary('db_high', 'db_high_secondary',
|
# create_secondary('db_high', 'db_high_secondary',
|
||||||
# Probability.IC_w_keylen, threshold=1.4)
|
# Probability.IC_w_keylen, threshold=1.4)
|
||||||
# create_secondary('db_norm', 'db_norm_secondary',
|
# create_secondary('db_norm', 'db_norm_secondary',
|
||||||
|
|||||||
@@ -60,27 +60,38 @@ class GuessAffine(object):
|
|||||||
#########################################
|
#########################################
|
||||||
|
|
||||||
class GuessPattern(object):
|
class GuessPattern(object):
|
||||||
def __init__(self, nums):
|
@staticmethod
|
||||||
self.nums = nums
|
def groups(nums, keylen, shift=1, offset=0):
|
||||||
|
gen = GuessPattern.shift_pattern(keylen, shift)
|
||||||
|
for _ in range(offset):
|
||||||
|
next(gen)
|
||||||
|
ret = [[] for _ in range(keylen)]
|
||||||
|
for idx, value in zip(gen, nums):
|
||||||
|
ret[idx].append(value)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def shift_pattern(kl, shift=1): # shift by (more than) one, 012201120
|
||||||
|
for i in range(10000):
|
||||||
|
p = (i * shift) % kl
|
||||||
|
yield from range(p, kl)
|
||||||
|
yield from range(p)
|
||||||
|
|
||||||
|
def mirror_pattern_a(kl): # mirrored, 012210012210
|
||||||
|
for i in range(10000):
|
||||||
|
yield from range(kl)
|
||||||
|
yield from range(kl - 1, -1, -1)
|
||||||
|
|
||||||
|
def mirror_pattern_b(kl): # mirrored, 012101210
|
||||||
|
for i in range(10000):
|
||||||
|
yield from range(kl)
|
||||||
|
yield from range(kl - 2, 0, -1)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def pattern(keylen, fn_pattern):
|
def zip(nums, key, keylen, shift=1, offset=0):
|
||||||
mask = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'[:keylen]
|
gen = GuessPattern.shift_pattern(keylen, shift)
|
||||||
return fn_pattern(mask, keylen)
|
|
||||||
|
|
||||||
def split(self, keylen, mask, offset=0):
|
|
||||||
ret = {}
|
|
||||||
for _ in range(offset):
|
for _ in range(offset):
|
||||||
next(mask)
|
next(gen)
|
||||||
ret = {k: [] for k in '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'[:keylen]}
|
return [(n - key[k]) % 29 for n, k in zip(nums, gen)]
|
||||||
for n, k in zip(self.nums, mask):
|
|
||||||
ret[k].append(n)
|
|
||||||
return ret.values()
|
|
||||||
|
|
||||||
def zip(self, key_mask, offset=0):
|
|
||||||
for _ in range(offset):
|
|
||||||
next(key_mask)
|
|
||||||
return [(n - k) % 29 for n, k in zip(self.nums, key_mask)]
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def guess(parts, score_fn): # minimize score_fn
|
def guess(parts, score_fn): # minimize score_fn
|
||||||
@@ -97,3 +108,7 @@ class GuessPattern(object):
|
|||||||
avg_score += best
|
avg_score += best
|
||||||
found.append(candidate)
|
found.append(candidate)
|
||||||
return avg_score / len(parts), found
|
return avg_score / len(parts), found
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
print(list(GuessPattern.shift_pattern(4, 3))[:20])
|
||||||
|
|||||||
@@ -84,27 +84,15 @@ def pattern_solver(fname, irp=0):
|
|||||||
def fn_similarity(x):
|
def fn_similarity(x):
|
||||||
return LP.Probability(x).similarity()
|
return LP.Probability(x).similarity()
|
||||||
|
|
||||||
def fn_pattern_mirror(x, kl):
|
|
||||||
for i in range(10000): # mirrored, 012210012210 or 012101210
|
|
||||||
yield from x
|
|
||||||
# yield from x[::-1]
|
|
||||||
yield from x[::-1][1:-1]
|
|
||||||
|
|
||||||
prnt_fmt = 'kl: {}, pattern-n: {}, IoC: {:.3f}, dist: {:.4f}, offset: {}, key: {}'
|
prnt_fmt = 'kl: {}, pattern-n: {}, IoC: {:.3f}, dist: {:.4f}, offset: {}, key: {}'
|
||||||
print(fname)
|
print(fname)
|
||||||
gr = LP.GuessPattern(data)
|
# gr = LP.GuessPattern(data)
|
||||||
for kl in range(3, 19):
|
for kl in range(3, 19):
|
||||||
for pattern_shift in range(1, kl):
|
for kl_shift in range(1, kl):
|
||||||
def fn_pattern_shift(x, kl): # shift by (more than) one, 012201120
|
|
||||||
for i in range(10000):
|
|
||||||
yield from x[(i * pattern_shift) % kl:]
|
|
||||||
yield from x[:(i * pattern_shift) % kl]
|
|
||||||
|
|
||||||
# Find proper pattern
|
# Find proper pattern
|
||||||
res = []
|
res = []
|
||||||
for offset in range(kl): # up to keylen offset
|
for offset in range(kl): # up to keylen offset
|
||||||
mask = LP.GuessPattern.pattern(kl, fn_pattern_shift)
|
parts = LP.GuessPattern.groups(data, kl, kl_shift, offset)
|
||||||
parts = gr.split(kl, mask, offset)
|
|
||||||
score = sum(LP.Probability(x).IC() for x in parts) / kl
|
score = sum(LP.Probability(x).IC() for x in parts) / kl
|
||||||
if score > 1.6 and score < 2.1:
|
if score > 1.6 and score < 2.1:
|
||||||
res.append((score, parts, offset))
|
res.append((score, parts, offset))
|
||||||
@@ -113,9 +101,9 @@ def pattern_solver(fname, irp=0):
|
|||||||
for score, parts, off in res:
|
for score, parts, off in res:
|
||||||
sc, key = LP.GuessPattern.guess(parts, fn_similarity)
|
sc, key = LP.GuessPattern.guess(parts, fn_similarity)
|
||||||
if sc < 0.1:
|
if sc < 0.1:
|
||||||
print(prnt_fmt.format(kl, pattern_shift, score, sc, off,
|
print(prnt_fmt.format(kl, kl_shift, score, sc, off,
|
||||||
LP.RuneText(key).text))
|
LP.RuneText(key).text))
|
||||||
solved = gr.zip(fn_pattern_shift(key, kl), off)
|
solved = LP.GuessPattern.zip(data, key, kl, kl_shift, off)
|
||||||
for i in whitespace_i:
|
for i in whitespace_i:
|
||||||
solved.insert(i, 29)
|
solved.insert(i, 29)
|
||||||
print(' ', LP.RuneText(solved).text)
|
print(' ', LP.RuneText(solved).text)
|
||||||
|
|||||||
Reference in New Issue
Block a user