refactor probability playground
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,3 +2,4 @@ __pycache__/
|
|||||||
other/ec-*.png
|
other/ec-*.png
|
||||||
other/ec-*.txt
|
other/ec-*.txt
|
||||||
other/list-onions.txt
|
other/list-onions.txt
|
||||||
|
out/
|
||||||
61
FailedAttempts.py
Executable file
61
FailedAttempts.py
Executable file
@@ -0,0 +1,61 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
from RuneText import rune_map, RuneText
|
||||||
|
from NGrams import NGrams
|
||||||
|
|
||||||
|
|
||||||
|
#########################################
|
||||||
|
# NGramShifter : Shift rune-pairs in a fixed-width running window
|
||||||
|
#########################################
|
||||||
|
|
||||||
|
class NGramShifter(object):
|
||||||
|
def __init__(self, gramsize=3): # 3 is the only reasonable value though
|
||||||
|
self.gramsize = gramsize
|
||||||
|
self.prob = NGrams.load(gramsize)
|
||||||
|
|
||||||
|
def ngram_probability_heatmap(self, data):
|
||||||
|
gram_count = len(data) // self.gramsize
|
||||||
|
ret = [[] for _ in range(gram_count)] # ret[x][y] x: parts, y: shifts
|
||||||
|
for y in range(29):
|
||||||
|
variant = data - y
|
||||||
|
for x in range(gram_count):
|
||||||
|
i = x * self.gramsize
|
||||||
|
gram = ''.join(r.rune for r in variant[i:i + self.gramsize])
|
||||||
|
ret[x].append((y, self.prob.get(gram, 0), gram))
|
||||||
|
# sort most probable first
|
||||||
|
for arr in ret:
|
||||||
|
arr.sort(key=lambda x: -x[1]) # (shift, probability)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def guess_single(self, data, interrupt_chr=None):
|
||||||
|
data = RuneText(data)
|
||||||
|
res = self.ngram_probability_heatmap(data)
|
||||||
|
fillup = ' ' * (2 * self.gramsize + 1)
|
||||||
|
all_interrupts = []
|
||||||
|
if interrupt_chr:
|
||||||
|
for i, x in enumerate(data):
|
||||||
|
if x.rune == interrupt_chr:
|
||||||
|
all_interrupts.append(i)
|
||||||
|
for y in range(29): # each row in output
|
||||||
|
line = ''
|
||||||
|
for i, obj in enumerate(res): # each column per row
|
||||||
|
txt = ''
|
||||||
|
if obj[y][1] > 0:
|
||||||
|
for u in range(self.gramsize):
|
||||||
|
if (i * self.gramsize + u) in all_interrupts:
|
||||||
|
txt += '|' # mark with preceding
|
||||||
|
txt += rune_map[obj[y][2][u]]
|
||||||
|
line += txt + fillup[len(txt):]
|
||||||
|
line = line.rstrip()
|
||||||
|
if line:
|
||||||
|
print(line)
|
||||||
|
|
||||||
|
def guess(self, data, interrupt_chr=None):
|
||||||
|
data = RuneText(data) # create RuneText once and reuse
|
||||||
|
for i in range(self.gramsize):
|
||||||
|
print('offset:', i)
|
||||||
|
self.guess_single(data[i:], interrupt_chr)
|
||||||
|
print()
|
||||||
|
|
||||||
|
|
||||||
|
# NGramShifter().guess('ᛈᚢᛟᚫᛈᚠᛖᚱᛋᛈᛈᚦᛗᚾᚪᚱᛚᚹᛈᛖᚩᛈᚢᛠᛁᛁᚻᛞᛚᛟᛠ', 'ᛟ')
|
||||||
|
# NGramShifter().guess([1, 2, 4, 5, 7, 9, 0, 12], 'ᛟ')
|
||||||
160
HeuristicSearch.py
Executable file
160
HeuristicSearch.py
Executable file
@@ -0,0 +1,160 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import itertools # product, compress, combinations
|
||||||
|
import bisect # bisect_left, insort
|
||||||
|
|
||||||
|
|
||||||
|
#########################################
|
||||||
|
# GuessVigenere : Shift values around with a given keylength
|
||||||
|
#########################################
|
||||||
|
|
||||||
|
class GuessVigenere(object):
|
||||||
|
def __init__(self, nums):
|
||||||
|
self.nums = nums
|
||||||
|
|
||||||
|
def guess(self, keylength, score_fn): # minimize score_fn
|
||||||
|
found = []
|
||||||
|
for offset in range(keylength):
|
||||||
|
bi = -1
|
||||||
|
bs = 9999999
|
||||||
|
for i in range(29):
|
||||||
|
shifted = [(x - i) % 29 for x in self.nums[offset::keylength]]
|
||||||
|
score = score_fn(shifted)
|
||||||
|
if score < bs:
|
||||||
|
bs = score
|
||||||
|
bi = i
|
||||||
|
found.append(bi)
|
||||||
|
return found
|
||||||
|
|
||||||
|
|
||||||
|
#########################################
|
||||||
|
# SearchInterrupt : Hill climbing algorithm for interrupt detection
|
||||||
|
#########################################
|
||||||
|
|
||||||
|
class SearchInterrupt(object):
|
||||||
|
def __init__(self, arr, interrupt_chr): # remove all whitespace in arr
|
||||||
|
self.single_result = False # if False, return list of equal likelihood
|
||||||
|
self.full = arr
|
||||||
|
self.stops = [i for i, n in enumerate(arr) if n == interrupt_chr]
|
||||||
|
|
||||||
|
def to_occurrence_index(self, interrupts):
|
||||||
|
return [self.stops.index(x) + 1 for x in interrupts]
|
||||||
|
|
||||||
|
def join(self, interrupts=[]): # rune positions, not occurrence index
|
||||||
|
ret = []
|
||||||
|
i = -1
|
||||||
|
for x in interrupts:
|
||||||
|
ret += self.full[i + 1:x]
|
||||||
|
i = x
|
||||||
|
return ret + self.full[i + 1:]
|
||||||
|
|
||||||
|
# Go over the full string but only look at the first {maxdepth} interrupts.
|
||||||
|
# Enumerate all possibilities and choose the one with the highest score.
|
||||||
|
# If first interrupt is set, add it to the resulting set. If not, ignore it
|
||||||
|
# Every iteration will add a single interrupt only, not the full set.
|
||||||
|
def sequential(self, score_fn, startAt=0, maxdepth=9):
|
||||||
|
found = [[]]
|
||||||
|
|
||||||
|
def best_in_one(i, depth, prefix=[]):
|
||||||
|
best_s = 0
|
||||||
|
best_p = [] # [match, match, ...]
|
||||||
|
irp = self.stops[i:i + depth]
|
||||||
|
for x in itertools.product([False, True], repeat=depth):
|
||||||
|
part = list(itertools.compress(irp, x))
|
||||||
|
score = score_fn(self.join(prefix + part))
|
||||||
|
if score >= best_s:
|
||||||
|
if score > best_s or self.single_result:
|
||||||
|
best_s = score
|
||||||
|
best_p = [part]
|
||||||
|
else:
|
||||||
|
best_p.append(part)
|
||||||
|
return best_p, best_s
|
||||||
|
|
||||||
|
def best_in_all(i, depth):
|
||||||
|
best_s = 0
|
||||||
|
best_p = [] # [(prefix, [match, match, ...]), ...]
|
||||||
|
for pre in found:
|
||||||
|
parts, score = best_in_one(i, depth, prefix=pre)
|
||||||
|
if score >= best_s:
|
||||||
|
if score > best_s or self.single_result:
|
||||||
|
best_s = score
|
||||||
|
best_p = [(pre, parts)]
|
||||||
|
else:
|
||||||
|
best_p.append((pre, parts))
|
||||||
|
return best_p, best_s
|
||||||
|
|
||||||
|
# first step: move maxdepth-sized window over data
|
||||||
|
i = startAt - 1 # in case loop isnt called
|
||||||
|
for i in range(startAt, len(self.stops) - maxdepth):
|
||||||
|
print('.', end='')
|
||||||
|
parts, _ = best_in_all(i, maxdepth)
|
||||||
|
found = []
|
||||||
|
search = self.stops[i]
|
||||||
|
for prfx, candidates in parts:
|
||||||
|
bitSet = False
|
||||||
|
bitNotSet = False
|
||||||
|
for x in candidates:
|
||||||
|
if len(x) > 0 and x[0] == search:
|
||||||
|
bitSet = True
|
||||||
|
else:
|
||||||
|
bitNotSet = True
|
||||||
|
if bitSet and bitNotSet:
|
||||||
|
break
|
||||||
|
if bitSet:
|
||||||
|
found.append(prfx + [search])
|
||||||
|
if bitNotSet:
|
||||||
|
found.append(prfx)
|
||||||
|
print('.')
|
||||||
|
# last step: all permutations for the remaining (< maxdepth) bits
|
||||||
|
i += 1
|
||||||
|
remaining, score = best_in_all(i, min(maxdepth, len(self.stops) - i))
|
||||||
|
found = [x + z for x, y in remaining for z in y]
|
||||||
|
return score, found
|
||||||
|
|
||||||
|
# Flip upto {maxdepth} bits anywhere in the full string.
|
||||||
|
# Choose the bitset with the highest score and repeat.
|
||||||
|
# If no better score found, increment number of testing bits and repeat.
|
||||||
|
# Either start with all interrupts set (topDown) or none set.
|
||||||
|
def genetic(self, score_fn, topDown=False, maxdepth=3):
|
||||||
|
best = 0
|
||||||
|
current = self.stops if topDown else []
|
||||||
|
|
||||||
|
def evolve(lvl):
|
||||||
|
for x in itertools.combinations(self.stops, lvl + 1):
|
||||||
|
tmp = current[:] # [x for x in current if x not in old]
|
||||||
|
for y in x:
|
||||||
|
if y is None:
|
||||||
|
continue
|
||||||
|
elif y in current:
|
||||||
|
tmp.pop(bisect.bisect_left(tmp, y))
|
||||||
|
else:
|
||||||
|
bisect.insort(tmp, y)
|
||||||
|
yield tmp, score_fn(self.join(tmp))
|
||||||
|
if lvl > 0:
|
||||||
|
yield from evolve(lvl - 1)
|
||||||
|
|
||||||
|
best = score_fn(self.join())
|
||||||
|
level = -1 # or start directly with maxdepth - 1
|
||||||
|
while level < maxdepth:
|
||||||
|
print('.', end='')
|
||||||
|
update = None
|
||||||
|
for interrupts, score in evolve(level):
|
||||||
|
if score > best:
|
||||||
|
best = score
|
||||||
|
update = interrupts
|
||||||
|
if update:
|
||||||
|
current = update
|
||||||
|
continue # did optimize, so retry with same level
|
||||||
|
level += 1
|
||||||
|
print()
|
||||||
|
# find equally likely candidates
|
||||||
|
if self.single_result:
|
||||||
|
return best, [current]
|
||||||
|
all_of_them = [x for x, score in evolve(2) if score == best]
|
||||||
|
all_of_them.append(current)
|
||||||
|
return best, all_of_them
|
||||||
|
|
||||||
|
|
||||||
|
# a = GuessInterrupt([2, 0, 1, 0, 14, 15, 0, 13, 24, 25, 25, 25], 0)
|
||||||
|
# print(a.sequential(lambda x: (1.2 if len(x) == 11 else 0.1)))
|
||||||
|
# print(a.sequential(lambda x: (1.1 if len(x) == 10 else 0.1)))
|
||||||
|
# print(a.sequential(lambda x: (1.3 if len(x) == 9 else 0.1)))
|
||||||
63
NGrams.py
Executable file
63
NGrams.py
Executable file
@@ -0,0 +1,63 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import re
|
||||||
|
from RuneText import alphabet, RuneText
|
||||||
|
|
||||||
|
|
||||||
|
#########################################
|
||||||
|
# NGrams : loads and writes ngrams, also: translate english text to runes
|
||||||
|
#########################################
|
||||||
|
|
||||||
|
class NGrams(object):
|
||||||
|
@staticmethod
|
||||||
|
def translate(infile, outfile, stream=False): # takes 10s
|
||||||
|
with open(infile, 'r') as f:
|
||||||
|
src = re.sub('[^A-Z]', '' if stream else ' ', f.read().upper())
|
||||||
|
if stream:
|
||||||
|
src.replace('\n', '')
|
||||||
|
|
||||||
|
with open(outfile, 'w') as f:
|
||||||
|
flag = False
|
||||||
|
for r in RuneText.from_text(src):
|
||||||
|
if r.kind != 'r':
|
||||||
|
if not flag:
|
||||||
|
f.write('\n')
|
||||||
|
flag = True
|
||||||
|
continue
|
||||||
|
f.write(r.rune)
|
||||||
|
flag = False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def make(gramsize, infile, outfile):
|
||||||
|
allowed_chr = [x[1] for x in alphabet]
|
||||||
|
with open(infile, 'r') as f:
|
||||||
|
data = re.sub('[^{}]'.format(''.join(allowed_chr)), '', f.read())
|
||||||
|
|
||||||
|
res = {x: 0 for x in allowed_chr} if gramsize == 1 else {}
|
||||||
|
for i in range(len(data) - gramsize + 1):
|
||||||
|
ngram = data[i:i + gramsize]
|
||||||
|
try:
|
||||||
|
res[ngram] += 1
|
||||||
|
except KeyError:
|
||||||
|
res[ngram] = 1
|
||||||
|
|
||||||
|
with open(outfile, 'w') as f:
|
||||||
|
for x, y in sorted(res.items(), key=lambda x: -x[1]):
|
||||||
|
f.write(f'{x} {y}\n')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def load(ngram=1):
|
||||||
|
ret = {}
|
||||||
|
with open(f'data/p-{ngram}gram.txt', 'r') as f:
|
||||||
|
for line in f.readlines():
|
||||||
|
r, v = line.split()
|
||||||
|
ret[r] = int(v)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
|
||||||
|
# NGrams.translate('data/baseline-text.txt', 'data/baseline-rune.txt', False)
|
||||||
|
# for i in range(1, 6):
|
||||||
|
# print(f'generate {i}-gram file')
|
||||||
|
# NGrams.make(i, infile='data/baseline-rune-words.txt',
|
||||||
|
# outfile=f'data/p-{i}gram.txt')
|
||||||
|
# NGrams.make(i, infile='_solved.txt',
|
||||||
|
# outfile=f'data/p-solved-{i}gram.txt')
|
||||||
@@ -25,3 +25,5 @@
|
|||||||
ᛉ 5
|
ᛉ 5
|
||||||
ᛄ 3
|
ᛄ 3
|
||||||
ᚫ 2
|
ᚫ 2
|
||||||
|
ᛇ 0
|
||||||
|
ᛟ 0
|
||||||
|
|||||||
@@ -1,29 +0,0 @@
|
|||||||
ᛖ 380
|
|
||||||
ᚩ 256
|
|
||||||
ᚪ 217
|
|
||||||
ᛋ 199
|
|
||||||
ᛏ 196
|
|
||||||
ᚱ 192
|
|
||||||
ᛁ 184
|
|
||||||
ᚾ 181
|
|
||||||
ᚢ 153
|
|
||||||
ᛞ 117
|
|
||||||
ᚦ 115
|
|
||||||
ᛚ 109
|
|
||||||
ᚹ 98
|
|
||||||
ᚳ 91
|
|
||||||
ᚻ 90
|
|
||||||
ᚣ 79
|
|
||||||
ᛗ 76
|
|
||||||
ᚠ 48
|
|
||||||
ᛈ 41
|
|
||||||
ᚷ 40
|
|
||||||
ᛒ 40
|
|
||||||
ᛝ 31
|
|
||||||
ᛠ 20
|
|
||||||
ᛡ 16
|
|
||||||
ᛉ 5
|
|
||||||
ᛄ 3
|
|
||||||
ᚫ 2
|
|
||||||
ᛇ 0
|
|
||||||
ᛟ 0
|
|
||||||
@@ -37,7 +37,7 @@ txt = '''
|
|||||||
'''
|
'''
|
||||||
|
|
||||||
rr = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwx'
|
rr = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwx'
|
||||||
rr = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwx1234567890'
|
# rr = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwx1234567890'
|
||||||
|
|
||||||
for base in range(58, 64):
|
for base in range(58, 64):
|
||||||
t = ''
|
t = ''
|
||||||
@@ -49,6 +49,7 @@ for base in range(58, 64):
|
|||||||
# n = rr.index(x[0]) * len(rr) + rr.index(x[1])
|
# n = rr.index(x[0]) * len(rr) + rr.index(x[1])
|
||||||
# n = int(x, 36)
|
# n = int(x, 36)
|
||||||
# t += '{},'.format(n)
|
# t += '{},'.format(n)
|
||||||
tt += chr(n)
|
# tt += chr(n)
|
||||||
|
tt += '{}{}'.format(rr[n // 16], rr[n % 16])
|
||||||
print(t)
|
print(t)
|
||||||
print(tt)
|
print(tt)
|
||||||
|
|||||||
435
probability.py
435
probability.py
@@ -1,206 +1,58 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import math
|
|
||||||
import re
|
import re
|
||||||
from RuneSolver import VigenereSolver
|
from RuneSolver import VigenereSolver
|
||||||
from RuneText import Rune, RuneText
|
from RuneText import RuneText
|
||||||
|
from NGrams import NGrams
|
||||||
|
from HeuristicSearch import GuessVigenere, SearchInterrupt
|
||||||
|
# from FailedAttempts import NGramShifter
|
||||||
|
|
||||||
RUNES = 'ᚠᚢᚦᚩᚱᚳᚷᚹᚻᚾᛁᛄᛇᛈᛉᛋᛏᛒᛖᛗᛚᛝᛟᛞᚪᚫᚣᛡᛠ'
|
RUNES = 'ᚠᚢᚦᚩᚱᚳᚷᚹᚻᚾᛁᛄᛇᛈᛉᛋᛏᛒᛖᛗᛚᛝᛟᛞᚪᚫᚣᛡᛠ'
|
||||||
RCOUNT = len(RUNES)
|
RCOUNT = len(RUNES)
|
||||||
ORG_INTERRUPT = RUNES.index('ᚠ')
|
ORG_INTERRUPT = 'ᚠ'
|
||||||
|
INV_INTERRUPT = RUNES.index(ORG_INTERRUPT)
|
||||||
INVERT = False
|
INVERT = False
|
||||||
INV_INTERRUPT = (28 - ORG_INTERRUPT) if INVERT else ORG_INTERRUPT
|
if INVERT:
|
||||||
LOOK_AHEAD = 9 # look ahead
|
INV_INTERRUPT = 28 - INV_INTERRUPT
|
||||||
APPEND_REMAINING = False # should it incl. text past the look ahead?
|
|
||||||
re_norune = re.compile('[^' + RUNES + ']')
|
re_norune = re.compile('[^' + RUNES + ']')
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
# BaselineProbability.translate()
|
|
||||||
# BaselineProbability.make('data/p-solved.txt', infile='_solved.txt')
|
|
||||||
# BaselineProbability.make('data/p-1gram.txt', 1)
|
|
||||||
# for i in range(1, 6):
|
|
||||||
# print(f'generate {i}-gram file')
|
|
||||||
# BaselineProbability.make(
|
|
||||||
# f'data/p-{i}gram.txt', i, infile='data/baseline-rune-words.txt')
|
|
||||||
# BaselineProbability.make(
|
|
||||||
# f'data/p-solved-{i}gram.txt', i, infile='_solved.txt')
|
|
||||||
# exit()
|
|
||||||
|
|
||||||
for fname in [
|
|
||||||
# '0_welcome', # V8
|
|
||||||
# 'jpg107-167', # V13
|
|
||||||
# '0_warning', # invert
|
|
||||||
# '0_wisdom', # plain
|
|
||||||
# 'p0-2', # ???
|
|
||||||
# 'p3-7', # ???
|
|
||||||
# 'p8-14', # ??? -> kl 11? or 12?
|
|
||||||
# 'p15-22', # ???
|
|
||||||
# 'p23-26', # ???
|
|
||||||
# 'p27-32', # ???
|
|
||||||
# 'p33-39', # ???
|
|
||||||
# 'p40-53', # ???
|
|
||||||
'p54-55', # ???
|
|
||||||
]:
|
|
||||||
data = load_data(fname)
|
|
||||||
# NGramShifter(data).try_all()
|
|
||||||
# print(VigenereBreaker(data).guess(8, [4,5,6,7,10,11,14,18,20,21,25]))
|
|
||||||
# print(VigenereBreaker(data).guess(13, [2, 3]))
|
|
||||||
# continue
|
|
||||||
if False:
|
|
||||||
# TODO: add some logic for two keys alternation
|
|
||||||
bst, kall = test_keylength(data[0::2], kmax=20, wInterrupt=True)
|
|
||||||
print('best estimate: keylength: {}, score: {:.4f}'.format(*bst))
|
|
||||||
# decrypt_to(kall, fname, '.0')
|
|
||||||
bst, kall = test_keylength(data[1::2], kmax=20, wInterrupt=True)
|
|
||||||
print('best estimate: keylength: {}, score: {:.4f}'.format(*bst))
|
|
||||||
# decrypt_to(kall, fname, '.1')
|
|
||||||
else:
|
|
||||||
bst, kall = test_keylength(data, kmin=1, kmax=32, start=1, wInterrupt=True)
|
|
||||||
print('best estimate: keylength: {}, score: {:.4f}'.format(*bst))
|
|
||||||
decrypt_to(kall, fname)
|
|
||||||
|
|
||||||
|
|
||||||
def load_data(fname):
|
def load_data(fname):
|
||||||
fname = 'pages/{}.txt'.format(fname)
|
fname = 'pages/{}.txt'.format(fname)
|
||||||
print()
|
print()
|
||||||
print('loading file:', fname)
|
print('loading file:', fname)
|
||||||
with open(fname, 'r') as f:
|
with open(fname, 'r') as f:
|
||||||
data = RuneText(re_norune.sub('', f.read()))
|
data = RuneText(re_norune.sub('', f.read()))['index']
|
||||||
data = [(28 - x).index if INVERT else x.index for x in data]
|
if INVERT:
|
||||||
|
data = [28 - x for x in data]
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
def decrypt_to(variants, infile, prfx=''):
|
|
||||||
slvr = VigenereSolver()
|
|
||||||
slvr.input.load(file=f'pages/{infile}.txt')
|
|
||||||
slvr.output.QUIET = True
|
|
||||||
slvr.output.COLORS = False
|
|
||||||
slvr.INTERRUPT = RUNES[ORG_INTERRUPT]
|
|
||||||
slvr.KEY_INVERT = INVERT
|
|
||||||
for kl, score, intrpts, key_guess in variants:
|
|
||||||
outfile = f'out/{infile}.{kl}{prfx}.txt'
|
|
||||||
with open(outfile, 'w') as f:
|
|
||||||
f.write(f'{kl}, {score:.4f}, {key_guess}, {intrpts}\n')
|
|
||||||
slvr.output.file_output = outfile
|
|
||||||
slvr.INTERRUPT_POS = intrpts
|
|
||||||
slvr.KEY_DATA = key_guess
|
|
||||||
slvr.run()
|
|
||||||
|
|
||||||
|
|
||||||
def test_keylength(nums, kmin=1, kmax=32, start=1, wInterrupt=False):
|
|
||||||
best_score = 0
|
|
||||||
best_kl = 0
|
|
||||||
ret = []
|
|
||||||
for kl in range(kmin, kmax + 1):
|
|
||||||
if wInterrupt:
|
|
||||||
score, intrpts = BinTest(nums, kl).test(start=start)
|
|
||||||
else:
|
|
||||||
score = Probability.IC_w_keylen(nums, kl)
|
|
||||||
intrpts = []
|
|
||||||
|
|
||||||
print('{} {:.4f}'.format(kl, score))
|
|
||||||
print(' jump:', intrpts)
|
|
||||||
key_guess = VigenereBreaker(nums).guess(kl, intrpts)
|
|
||||||
print(' key:', key_guess)
|
|
||||||
ret.append((kl, score, intrpts, key_guess))
|
|
||||||
|
|
||||||
if score > best_score:
|
|
||||||
best_score = score
|
|
||||||
best_kl = kl
|
|
||||||
return (best_kl, best_score), ret
|
|
||||||
|
|
||||||
|
|
||||||
#########################################
|
|
||||||
# BaselineProbability : loads and writes ngrams
|
|
||||||
#########################################
|
|
||||||
|
|
||||||
class BaselineProbability(object):
|
|
||||||
@staticmethod
|
|
||||||
def translate(): # takes 10s
|
|
||||||
with open('data/baseline-text.txt', 'r') as f:
|
|
||||||
src = re.sub('[^A-Z]', ' ', f.read().upper())
|
|
||||||
# src.replace('\n', '')
|
|
||||||
|
|
||||||
with open('data/baseline-rune.txt', 'w') as f:
|
|
||||||
flag = False
|
|
||||||
for r in RuneText.from_text(src):
|
|
||||||
if r.kind != 'r':
|
|
||||||
if not flag:
|
|
||||||
f.write('\n')
|
|
||||||
flag = True
|
|
||||||
continue
|
|
||||||
f.write(r.rune)
|
|
||||||
flag = False
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def make(outfile, gramsize=1, infile='data/baseline-rune.txt'):
|
|
||||||
res = {x: 0 for x in RUNES}
|
|
||||||
for x in range(gramsize - 1):
|
|
||||||
res = {x + y: 0 for x in RUNES for y in res.keys()}
|
|
||||||
with open(infile, 'r') as f:
|
|
||||||
data = re_norune.sub('', f.read())
|
|
||||||
for i in range(len(data) - (gramsize - 1)):
|
|
||||||
ngram = data[i:i + gramsize]
|
|
||||||
res[ngram] += 1
|
|
||||||
with open(outfile, 'w') as f:
|
|
||||||
for x, y in sorted(res.items(), key=lambda x: -x[1]):
|
|
||||||
if y != 0:
|
|
||||||
f.write(f'{x} {y}\n')
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def load_ngram(gram=2):
|
|
||||||
ret = {}
|
|
||||||
with open(f'data/p-{gram}gram.txt', 'r') as f:
|
|
||||||
for line in f.readlines():
|
|
||||||
r, v = line.split()
|
|
||||||
ret[r] = int(v)
|
|
||||||
return ret
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def load():
|
|
||||||
with open('data/p-1gram.txt', 'r') as f:
|
|
||||||
lines = f.readlines()
|
|
||||||
ret = [0] * RCOUNT
|
|
||||||
for line in lines:
|
|
||||||
r, v = line.split()
|
|
||||||
ret[RUNES.index(r)] = int(v)
|
|
||||||
return ret
|
|
||||||
|
|
||||||
|
|
||||||
#########################################
|
#########################################
|
||||||
# Probability : Count runes and simple frequency analysis
|
# Probability : Count runes and simple frequency analysis
|
||||||
#########################################
|
#########################################
|
||||||
|
|
||||||
class Probability(object):
|
class Probability(object):
|
||||||
def __init__(self, arr):
|
def __init__(self, numstream):
|
||||||
self.prob = Probability.count(arr)
|
self.prob = [0] * RCOUNT
|
||||||
self.N = len(arr)
|
for r in numstream:
|
||||||
|
self.prob[r] += 1
|
||||||
|
self.N = len(numstream)
|
||||||
|
|
||||||
def IC(self):
|
def IC(self):
|
||||||
X = sum([x * (x - 1) for x in self.prob])
|
X = sum(x * (x - 1) for x in self.prob)
|
||||||
return X / ((self.N * (self.N - 1)) / 29)
|
return X / ((self.N * (self.N - 1)) / 29)
|
||||||
|
|
||||||
def friedman(self):
|
def friedman(self):
|
||||||
return (K_p - K_r) / (self.IC() - K_r)
|
return (K_p - K_r) / (self.IC() - K_r)
|
||||||
|
|
||||||
def similarity(self):
|
def similarity(self):
|
||||||
probs = Probability.to_log(self.prob)
|
probs = Probability.normalized(self.prob)
|
||||||
return sum((PROB_BASELINE[i] - probs[i]) ** 2 for i in range(RCOUNT))
|
return sum((x - y) ** 2 for x, y in zip(PROB_NORM, probs))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def count(nums):
|
def normalized(int_prob):
|
||||||
res = [0] * RCOUNT
|
|
||||||
for r in nums:
|
|
||||||
res[r] += 1
|
|
||||||
return res
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def to_log(int_prob):
|
|
||||||
total = sum(int_prob)
|
total = sum(int_prob)
|
||||||
for i, v in enumerate(int_prob):
|
return [x / total for x in int_prob] # math.log(x / total, 10)
|
||||||
int_prob[i] = v / total
|
|
||||||
# int_prob[i] = math.log(v / total, 10)
|
|
||||||
return int_prob
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def IC_w_keylen(nums, keylen):
|
def IC_w_keylen(nums, keylen):
|
||||||
@@ -209,193 +61,88 @@ class Probability(object):
|
|||||||
|
|
||||||
|
|
||||||
#########################################
|
#########################################
|
||||||
# BinTest : Split text into Vigenere columns and apply frequency anlysis
|
# Perform heuristic search on the keylength, interrupts, and key
|
||||||
#########################################
|
#########################################
|
||||||
|
|
||||||
class BinTest(object):
|
def enum_keylengths(nums, fn_interrupt, fn_keyguess, kmin=1, kmax=32):
|
||||||
def __init__(self, nums, keylength):
|
best_s = 0
|
||||||
self.keylength = keylength
|
best_kl = 0
|
||||||
self.intrpts = [-1]
|
iguess = SearchInterrupt(nums, INV_INTERRUPT)
|
||||||
self.parts = []
|
print('interrupt:', ORG_INTERRUPT, 'count:', len(iguess.stops))
|
||||||
for i, n in enumerate(nums):
|
for kl in range(kmin, kmax + 1):
|
||||||
if n != INV_INTERRUPT:
|
score, intrpts = fn_interrupt(kl, iguess)
|
||||||
continue
|
print('{} {:.4f}'.format(kl, score))
|
||||||
self.parts.append(nums[self.intrpts[-1] + 1:i]) # drop ᚠ
|
key_guess = []
|
||||||
self.intrpts.append(i)
|
for i, skips in enumerate(intrpts):
|
||||||
self.parts.append(nums[self.intrpts[-1] + 1:]) # remainder
|
key = fn_keyguess(kl, iguess.join(skips))
|
||||||
self.previous = self.parts[0]
|
yield kl, score, i, skips, key
|
||||||
|
key_guess.append(key)
|
||||||
def permutations(self, index, maxdepth=LOOK_AHEAD):
|
intrpts[i] = iguess.to_occurrence_index(skips)
|
||||||
ret = [self.previous]
|
print(' skip:', intrpts)
|
||||||
i = maxdepth
|
print(' key:', key_guess)
|
||||||
for part in self.parts[index:]:
|
if score > best_s:
|
||||||
tmp = []
|
best_s = score
|
||||||
for x in ret:
|
best_kl = kl
|
||||||
tmp.append(x + [INV_INTERRUPT] + part)
|
print(f'best estimate: keylength: {best_kl}, score: {best_s:.4f}')
|
||||||
tmp.append(x + part) # + INV_INTERRUPT
|
|
||||||
# TODO: properly append INV_INTERRUPT
|
|
||||||
# ommitting a rune will slightly favor the shorter text
|
|
||||||
# however, adding it at the end will shift all remaining runes
|
|
||||||
ret = tmp
|
|
||||||
i -= 1
|
|
||||||
if i <= 0:
|
|
||||||
if APPEND_REMAINING:
|
|
||||||
remainder = []
|
|
||||||
for z in self.parts[index + maxdepth:]:
|
|
||||||
remainder.extend([INV_INTERRUPT] + z)
|
|
||||||
for u in range(len(ret)):
|
|
||||||
ret[u].extend(remainder)
|
|
||||||
break
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def best_permutation(self, start, maxdepth=LOOK_AHEAD, oneShot=False):
|
|
||||||
# TODO: better algorithm to select interrupts
|
|
||||||
permutations = self.permutations(start, maxdepth=maxdepth)
|
|
||||||
best_i = 0
|
|
||||||
best_score = 0
|
|
||||||
# try all permutations for the next x interrupts
|
|
||||||
for p_i, p in enumerate(permutations):
|
|
||||||
score = Probability.IC_w_keylen(p, self.keylength)
|
|
||||||
if score > best_score:
|
|
||||||
best_score = score
|
|
||||||
best_i = p_i
|
|
||||||
if oneShot:
|
|
||||||
# permutations without interrupt are appended first
|
|
||||||
# since we only care about the first char, i >= len/2 is sufficient
|
|
||||||
is_interrupt = best_i >= len(permutations) / 2
|
|
||||||
return best_score, is_interrupt
|
|
||||||
else:
|
|
||||||
found = []
|
|
||||||
mi = int(math.log(len(permutations), 2))
|
|
||||||
for i in range(mi):
|
|
||||||
if best_i & (1 << (mi - i)):
|
|
||||||
found.append(i + start - 1)
|
|
||||||
return best_score, found
|
|
||||||
|
|
||||||
def join_parts(self, end=None):
|
|
||||||
ret = []
|
|
||||||
for part in self.parts[:end]:
|
|
||||||
ret.append(INV_INTERRUPT)
|
|
||||||
ret.extend(part)
|
|
||||||
return ret[1:]
|
|
||||||
|
|
||||||
def test(self, start=1):
|
|
||||||
if start > 1:
|
|
||||||
if start >= len(self.parts):
|
|
||||||
start = len(self.parts) - 1
|
|
||||||
self.previous = self.join_parts(self.intrpts[start])
|
|
||||||
# # enum all possible permutation. But only once
|
|
||||||
# return self.best_permutation(start=start, maxdepth=12, oneShot=True)
|
|
||||||
# # calculate IoC without interrupts
|
|
||||||
# return Probability.IC_w_keylen(self.join_parts(), self.keylength), []
|
|
||||||
if start >= len(self.intrpts):
|
|
||||||
return Probability.IC_w_keylen(self.previous, self.keylength), []
|
|
||||||
|
|
||||||
found = []
|
|
||||||
best = 0
|
|
||||||
for i in range(start, len(self.intrpts)):
|
|
||||||
score, is_interrupt = self.best_permutation(i)
|
|
||||||
if score > best:
|
|
||||||
best = score
|
|
||||||
if is_interrupt:
|
|
||||||
found.append(i)
|
|
||||||
else:
|
|
||||||
self.previous += [INV_INTERRUPT]
|
|
||||||
self.previous.extend(self.parts[i])
|
|
||||||
return best, found
|
|
||||||
|
|
||||||
|
|
||||||
#########################################
|
def fn_break_vigenere(fname, data):
|
||||||
# VigenereBreaker : Given a fixed keylength, shift values around
|
def fn_similarity(x):
|
||||||
#########################################
|
return Probability(x).similarity()
|
||||||
|
|
||||||
class VigenereBreaker(object):
|
def fn_irp(kl, iguess):
|
||||||
def __init__(self, nums):
|
def fn_IoC(x):
|
||||||
self.nums = nums
|
return Probability.IC_w_keylen(x, kl)
|
||||||
|
return iguess.sequential(fn_IoC, startAt=0, maxdepth=9)
|
||||||
|
# return iguess.genetic(fn_IoC, topDown=False, maxdepth=4)
|
||||||
|
# return fn_IoC(iguess.join()), [[]] # without interrupts
|
||||||
|
|
||||||
def guess(self, keylength, interrupts=[]):
|
def fn_key(kl, data):
|
||||||
intup = 0
|
return GuessVigenere(data).guess(kl, fn_similarity)
|
||||||
ii = 0
|
|
||||||
bins = [[] for _ in range(keylength)]
|
|
||||||
for i, n in enumerate(self.nums):
|
|
||||||
if n == INV_INTERRUPT:
|
|
||||||
intup += 1
|
|
||||||
if intup in interrupts:
|
|
||||||
continue
|
|
||||||
bins[ii % keylength].append(n)
|
|
||||||
ii += 1
|
|
||||||
found = []
|
|
||||||
for data in bins:
|
|
||||||
shifted = [[] for _ in range(29)]
|
|
||||||
for x in data:
|
|
||||||
for i in range(29):
|
|
||||||
shifted[i].append((x - i) % 29)
|
|
||||||
bi = -1
|
|
||||||
bs = 9999999
|
|
||||||
for i, test in enumerate(shifted):
|
|
||||||
score = Probability(test).similarity()
|
|
||||||
if score < bs:
|
|
||||||
bs = score
|
|
||||||
bi = i
|
|
||||||
found.append(bi)
|
|
||||||
return found
|
|
||||||
|
|
||||||
|
slvr = VigenereSolver()
|
||||||
#########################################
|
slvr.input.load(file=f'pages/{fname}.txt')
|
||||||
# NGramShifter : Shift fixed with runes around
|
slvr.output.QUIET = True
|
||||||
#########################################
|
slvr.output.COLORS = False
|
||||||
|
slvr.INTERRUPT = ORG_INTERRUPT
|
||||||
class NGramShifter(object):
|
slvr.KEY_INVERT = INVERT
|
||||||
def __init__(self, data):
|
for kl, score, i, skips, key in enum_keylengths(data, fn_irp, fn_key,
|
||||||
self.data = data
|
kmin=1, kmax=32):
|
||||||
self.variants = [''.join(RUNES[(y - x) % 29] for y in data)
|
outfile = f'out/{fname}.{score:.3f}.{kl}.{i}.txt'
|
||||||
for x in range(29)]
|
with open(outfile, 'w') as f:
|
||||||
|
f.write(f'{kl}, {score:.4f}, {key}, {skips}\n')
|
||||||
def try_all(self, gramsize=3):
|
slvr.output.file_output = outfile
|
||||||
for i in range(gramsize):
|
slvr.INTERRUPT_POS = skips
|
||||||
print('offset:', i)
|
slvr.KEY_DATA = key
|
||||||
NGramShifter(self.data[i:]).guess(gramsize)
|
slvr.run()
|
||||||
print()
|
|
||||||
|
|
||||||
def guess(self, keylength, interrupts=[]):
|
|
||||||
prob = BaselineProbability.load_ngram(keylength)
|
|
||||||
maxlen = len(self.data) - len(self.data) % keylength
|
|
||||||
res = [[] for _ in range(maxlen // keylength)]
|
|
||||||
for v, data in enumerate(self.variants):
|
|
||||||
for i in range(0, maxlen, keylength):
|
|
||||||
gram = data[i:i + keylength]
|
|
||||||
try:
|
|
||||||
value = prob[gram]
|
|
||||||
except KeyError:
|
|
||||||
value = 0
|
|
||||||
res[i // keylength].append((v, value))
|
|
||||||
for arr in res:
|
|
||||||
arr.sort(key=lambda x: -x[1])
|
|
||||||
fillup = ' ' * (2 * keylength + 1)
|
|
||||||
interrupts = [i for i, x in enumerate(self.data) if x == INV_INTERRUPT]
|
|
||||||
for i in range(29):
|
|
||||||
txt = ''
|
|
||||||
for u, x in enumerate(res):
|
|
||||||
u *= keylength
|
|
||||||
tt = ''
|
|
||||||
if x[i][1] > 0:
|
|
||||||
for o in range(u, u + keylength):
|
|
||||||
if o in interrupts:
|
|
||||||
tt += '|' # mark with preceding
|
|
||||||
tt += Rune(r=self.variants[x[i][0]][o]).text
|
|
||||||
txt += tt + fillup[len(tt):]
|
|
||||||
txt = txt.rstrip()
|
|
||||||
if txt:
|
|
||||||
print(txt)
|
|
||||||
|
|
||||||
|
|
||||||
#########################################
|
#########################################
|
||||||
# main
|
# main
|
||||||
#########################################
|
#########################################
|
||||||
|
|
||||||
PROB_BASELINE = Probability.to_log(BaselineProbability.load())
|
PROB_INT = [0] * RCOUNT
|
||||||
|
for k, v in NGrams.load().items():
|
||||||
|
PROB_INT[RUNES.index(k)] = v
|
||||||
|
PROB_NORM = Probability.normalized(PROB_INT)
|
||||||
K_r = 1 / 29 # 0.034482758620689655
|
K_r = 1 / 29 # 0.034482758620689655
|
||||||
K_p = sum([x ** 2 for x in PROB_BASELINE]) # 0.06116195419412538
|
K_p = sum(x ** 2 for x in PROB_INT) # 0.06116195419412538
|
||||||
|
|
||||||
if __name__ == '__main__':
|
for fname in [
|
||||||
main()
|
# '0_welcome', # V8
|
||||||
|
# 'jpg107-167', # V13
|
||||||
|
# '0_warning', # invert
|
||||||
|
# '0_wisdom', # plain
|
||||||
|
# 'p0-2', # ???
|
||||||
|
# 'p3-7', # ???
|
||||||
|
# 'p8-14', # ??? -> kl 11? or 12?
|
||||||
|
# 'p15-22', # ???
|
||||||
|
# 'p23-26', # ???
|
||||||
|
# 'p27-32', # ???
|
||||||
|
# 'p33-39', # ???
|
||||||
|
# 'p40-53', # ???
|
||||||
|
'p54-55', # ???
|
||||||
|
]:
|
||||||
|
data = load_data(fname)
|
||||||
|
# NGramShifter().guess(data, RUNES[INV_INTERRUPT])
|
||||||
|
fn_break_vigenere(fname, data)
|
||||||
|
|||||||
Reference in New Issue
Block a user