761
This commit is contained in:
344
other/761/wav.py
Executable file
344
other/761/wav.py
Executable file
@@ -0,0 +1,344 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import wave
|
||||
import struct
|
||||
|
||||
|
||||
TRACK_LEN = 146.468
|
||||
SAMPLE_LEN = 6459264 # hard coded so we dont need to load the file
|
||||
SAMPLING = 8 # take every x-th frame
|
||||
SMOOTH_WINDOW = 2 # gaussian window size X +- window
|
||||
END_TIMES = [
|
||||
10.639, 15.914, 28.937, 32.239, 33.590, 38.875, 42.408,
|
||||
45.919, 49.475, 54.763, 72.301, 74.172, 81.219, 82.339,
|
||||
92.900, 99.753, 100.654, 105.919, 114.771, 146.468]
|
||||
|
||||
|
||||
def flip_bits(bits):
|
||||
return bits.replace('1', '_').replace('0', '1').replace('_', '0')
|
||||
|
||||
|
||||
def bin_to_hex(binary_str):
|
||||
ret = ''
|
||||
for i in range(0, len(binary_str), 8):
|
||||
ret += '{:02X}'.format(int(binary_str[i:i + 8], 2))
|
||||
return ret
|
||||
|
||||
|
||||
def bin_to_text(binary_str):
|
||||
ret = ''
|
||||
for i in range(0, len(binary_str), 8):
|
||||
ret += chr(int(binary_str[i:i + 8], 2))
|
||||
return ret
|
||||
|
||||
|
||||
def oneChannel(fname, chanIdx, maxread=None):
|
||||
f = wave.open(fname, 'rb')
|
||||
c_chn = f.getnchannels()
|
||||
c_frm = f.getnframes()
|
||||
if maxread:
|
||||
c_frm = min(maxread, c_frm)
|
||||
assert f.getsampwidth() == 2
|
||||
s = f.readframes(c_frm)
|
||||
f.close()
|
||||
unpstr = '<{0}h'.format(c_frm * c_chn)
|
||||
x = list(struct.unpack(unpstr, s))
|
||||
return x[chanIdx::c_chn]
|
||||
|
||||
|
||||
def find_db_peaks(wav_filename, threshold, write_to=None):
|
||||
res = oneChannel(wav_filename, 1) # 100000
|
||||
if len(res) != SAMPLE_LEN:
|
||||
print('WARN: file sample rate mismatch with SAMPLE_LEN')
|
||||
with open(write_to, 'wb') as fo:
|
||||
# apply a rough gaussian smoothing
|
||||
ftlr_rng = range(-SMOOTH_WINDOW, SMOOTH_WINDOW + 1)
|
||||
for i in range(SMOOTH_WINDOW, len(res) - SMOOTH_WINDOW, SAMPLING):
|
||||
z = [res[i + x] * (1 / (abs(x) + 1)) for x in ftlr_rng]
|
||||
z = sum(z)
|
||||
f = abs(z) > 400 # threshold
|
||||
fo.write(b'\xFF' if f else b'\x00')
|
||||
|
||||
|
||||
def fill_gaps(fname, window_size, min_count, threshold=128, write_to=None):
|
||||
window = [0] * window_size
|
||||
with open(write_to, 'wb') as fo:
|
||||
with open(fname, 'rb') as fi:
|
||||
for x in fi.read():
|
||||
window.pop(0)
|
||||
window.append(1 if x > threshold else 0)
|
||||
f = sum(window) > min_count
|
||||
fo.write(b'\xFF' if f else b'\x00')
|
||||
|
||||
|
||||
def find_db_change(fname, threshold=128, write_to=None):
|
||||
res = [(0, False)]
|
||||
prev = False
|
||||
with open(fname, 'rb') as fi:
|
||||
for i, x in enumerate(fi.read()):
|
||||
f = x > threshold
|
||||
if f != prev:
|
||||
prev = f
|
||||
res.append((i, f))
|
||||
with open(write_to, 'w') as fo:
|
||||
for x in res:
|
||||
fo.write('{}: {}\n'.format(*x))
|
||||
# fo.write('\n'.join(['{}: {}'.format(*x) for x in res]))
|
||||
|
||||
|
||||
def find_signal_midpoints(fname):
|
||||
res = [] # (pos, width, dist_to_prev)
|
||||
prev = 0
|
||||
with open(fname, 'r') as fi:
|
||||
lines = fi.readlines()
|
||||
for x, y in zip(lines[1::2], lines[2::2]):
|
||||
x = int(x.split(':')[0])
|
||||
y = int(y.split(':')[0])
|
||||
w = y - x
|
||||
x += int(w / 2) # center point
|
||||
res.append((x, w, x - prev))
|
||||
prev = x
|
||||
return res
|
||||
|
||||
|
||||
def analyze_midpoints(midpoints_list, min_frames):
|
||||
res = [] # (frame-no, time, dist-to-prev, type 'S-M-E')
|
||||
typ = 'E' # marks first as [S]tart
|
||||
for x, width, dist in midpoints_list:
|
||||
if width < min_frames:
|
||||
continue
|
||||
typ = 'S' if typ == 'E' else 'M'
|
||||
x *= SAMPLING
|
||||
at_time = x / SAMPLE_LEN * TRACK_LEN
|
||||
dist *= SAMPLING
|
||||
for i, end_time in enumerate(END_TIMES):
|
||||
if abs(end_time - at_time) < 0.100: # accurate within 100 ms
|
||||
typ = 'E'
|
||||
del END_TIMES[i] # keeps count if all are used up
|
||||
break
|
||||
res.append((x, at_time, dist, typ))
|
||||
if len(END_TIMES) > 0:
|
||||
if END_TIMES[0] > res[-1][1]:
|
||||
for x in END_TIMES:
|
||||
fn = round(x / TRACK_LEN * SAMPLE_LEN)
|
||||
res.append((fn, x, fn - res[-1][0], 'E'))
|
||||
else:
|
||||
print('These endpoints were not found:')
|
||||
print(END_TIMES) # double check
|
||||
return res
|
||||
|
||||
|
||||
def find_common_frame_dist(arr):
|
||||
arr = [x[2] for x in arr if x[3] != 'S']
|
||||
min_dist = min(arr)
|
||||
print('Smallest common divisor: {}'.format(min_dist))
|
||||
best_match = min_dist
|
||||
best_sum = 999999
|
||||
for tx in range(min_dist - 200, min_dist + 200 + 1):
|
||||
subsum = 0
|
||||
for x in arr:
|
||||
x /= tx
|
||||
x -= round(x)
|
||||
subsum += x * x # least square distance
|
||||
if subsum < best_sum:
|
||||
best_sum = subsum
|
||||
best_match = tx
|
||||
print('Best matching frame dist: {}'.format(best_match))
|
||||
return best_match
|
||||
|
||||
|
||||
def analyze_db_peaks(wav_file, force=False):
|
||||
print('761')
|
||||
print('===')
|
||||
print('Track length:', TRACK_LEN)
|
||||
print('Total frames:', SAMPLE_LEN)
|
||||
if not os.path.isdir('tmp'):
|
||||
os.mkdir('tmp')
|
||||
tmp1 = 'tmp/wav-peak-analysis_1.dat'
|
||||
tmp2 = 'tmp/wav-peak-analysis_2.dat'
|
||||
tmp3 = 'tmp/wav-peak-analysis_3.txt'
|
||||
|
||||
if force or not os.path.isfile(tmp1):
|
||||
find_db_peaks(wav_file, 400, write_to=tmp1)
|
||||
|
||||
if force or not os.path.isfile(tmp2):
|
||||
fill_gaps(tmp1, window_size=80, min_count=20, write_to=tmp2)
|
||||
|
||||
# force = True
|
||||
if force or not os.path.isfile(tmp3):
|
||||
find_db_change(tmp2, write_to=tmp3)
|
||||
|
||||
points = find_signal_midpoints(tmp3)
|
||||
points = analyze_midpoints(points, min_frames=10)
|
||||
freq = find_common_frame_dist(points)
|
||||
# if times between 96.68-96.79 and 70.10-70.21 are sampled differently
|
||||
# freq /= 2 # use *2 or /2 to decrease or increase sampling frequency
|
||||
|
||||
print('''
|
||||
The columns are as follows:
|
||||
Type Time(s) Time(frame) dist-to-prev
|
||||
|
||||
- Type is one of [S]tart point, [M]id-point, or [E]nd point
|
||||
- dist-to-prev is frame distance to previous signal divided by frame-dist
|
||||
''')
|
||||
|
||||
bits = ['']
|
||||
nums = [[]]
|
||||
t_between = []
|
||||
t_lengths = []
|
||||
since_start = 0
|
||||
|
||||
for x, at, dist, typ in points:
|
||||
def time_diff_tpl(diff):
|
||||
return (round(diff / freq), diff / SAMPLE_LEN * TRACK_LEN)
|
||||
|
||||
in_samples = round(dist / freq)
|
||||
print('{} {:.2f} {} {}'.format(typ, at, x, in_samples))
|
||||
if typ == 'S':
|
||||
# bits[-1] += '0' * (in_samples - 1) # consider space between
|
||||
bits[-1] += '1'
|
||||
t_between.append(time_diff_tpl(dist))
|
||||
since_start = 0
|
||||
elif typ == 'E':
|
||||
bits[-1] += '0' * (in_samples - 1)
|
||||
bits[-1] += '0' # or 1?
|
||||
missing_bits = 8 - len(bits[-1]) % 8
|
||||
if missing_bits != 8:
|
||||
# bits[-1] = '0' * missing_bits + bits[-1]
|
||||
bits[-1] += '0' * missing_bits
|
||||
|
||||
since_start += dist
|
||||
t_lengths.append(time_diff_tpl(since_start))
|
||||
bits.append('')
|
||||
nums[-1].append(in_samples)
|
||||
nums.append([])
|
||||
else:
|
||||
since_start += dist
|
||||
bits[-1] += '0' * (in_samples - 1)
|
||||
bits[-1] += '1'
|
||||
nums[-1].append(in_samples)
|
||||
if bits[-1] == '':
|
||||
del bits[-1]
|
||||
if not nums[-1]:
|
||||
del nums[-1]
|
||||
print()
|
||||
|
||||
print('Distance between transmissions:')
|
||||
print(', '.join(['{} ({:.2f}s)'.format(x, y) for x, y in t_between]))
|
||||
print()
|
||||
|
||||
print('Lengths of transmission:')
|
||||
print(', '.join(['{} ({:.2f}s)'.format(x, y) for x, y in t_lengths]))
|
||||
print()
|
||||
|
||||
print('Individual signals:')
|
||||
for i, x in enumerate(nums):
|
||||
print(' {:2}: {}'.format(i, x))
|
||||
print()
|
||||
|
||||
print('Individual signals (total time):')
|
||||
for i, x in enumerate(nums):
|
||||
r = [0]
|
||||
for n in x:
|
||||
r.append(r[-1] + n)
|
||||
print(' {:2}: {}'.format(i, r[1:]))
|
||||
print()
|
||||
|
||||
print('''
|
||||
The following assumes that each transmission:
|
||||
- begins with a 1 bit
|
||||
- end is always a 0 bit
|
||||
- midpoints are '0' * (dist-to-prev - 1) + '1'
|
||||
- no counting in-between transmissions
|
||||
|
||||
Here is a representation of the individual transmissions,
|
||||
as well as the full string at the end. Results are:
|
||||
|
||||
(0): signals are 1 bit, read left-to-right
|
||||
(1): reverse bit order (aka. read right-to-left)
|
||||
(2): as (0) but with inverted bits
|
||||
(3): reversed and inverted
|
||||
|
||||
Interpreting individual transmissions:
|
||||
''')
|
||||
|
||||
def print_arr_w_alternates(bits, fn):
|
||||
print('0\n{}'.format([fn(x) for x in bits]))
|
||||
print('1\n{}'.format([fn(x[::-1]) for x in bits]))
|
||||
print('2\n{}'.format([fn(flip_bits(x)) for x in bits]))
|
||||
print('3\n{}'.format([fn(flip_bits(x)[::-1]) for x in bits]))
|
||||
print()
|
||||
|
||||
def print_str_w_alternates(bits, fn):
|
||||
not_bits = flip_bits(bits)
|
||||
print('0: {}'.format(fn(bits)))
|
||||
print('1: {}'.format(fn(bits[::-1])))
|
||||
print('2: {}'.format(fn(not_bits)))
|
||||
print('3: {}'.format(fn(not_bits[::-1])))
|
||||
print()
|
||||
|
||||
# print('As numbers:')
|
||||
# print_arr_w_alternates(bits, lambda x: int(x, 2))
|
||||
# print('As binary:')
|
||||
# print_arr_w_alternates(bits, lambda x: x)
|
||||
# print('As hex:')
|
||||
# print_arr_w_alternates(bits, lambda x: bin_to_hex(x))
|
||||
print('As text:')
|
||||
print_arr_w_alternates(bits, lambda x: bin_to_text(x))
|
||||
|
||||
print('Interpreting as a whole:')
|
||||
print()
|
||||
concat = ''.join([x for x in bits])
|
||||
# print('As numbers:')
|
||||
# print_str_w_alternates(concat, lambda x: int(x, 2))
|
||||
# print('As binary:')
|
||||
# print_str_w_alternates(concat, lambda x: x)
|
||||
# print('As hex:')
|
||||
# print_str_w_alternates(concat, lambda x: bin_to_hex(x))
|
||||
print('As text:')
|
||||
print_str_w_alternates(concat, lambda x: bin_to_text(x))
|
||||
|
||||
|
||||
# Least Significant Bit Analysis
|
||||
# https://medium.com/analytics-vidhya/get-secret-message-from-audio-file-8769421205c3
|
||||
def analyze_lsb(wav_filename):
|
||||
obj = wave.open(wav_filename, 'rb')
|
||||
# print(obj.getparams())
|
||||
fcount = obj.getnframes()
|
||||
fcount = 1000
|
||||
bytes = bytearray(list(obj.readframes(fcount)))
|
||||
obj.close()
|
||||
bytes = struct.unpack('H' * (len(bytes) // 2), bytes)
|
||||
# if not os.path.isdir('tmp'):
|
||||
# os.mkdir('tmp')
|
||||
|
||||
# Every frame LSB
|
||||
for z in range(1, 2):
|
||||
for u in range(z):
|
||||
txt = ''
|
||||
for i in range(u, len(bytes), z):
|
||||
f = bytes[i] & (1 << 0)
|
||||
txt += '1' if f else '0'
|
||||
# txt += chr(bytes[i])
|
||||
# print(txt)
|
||||
print(bin_to_text(txt))
|
||||
|
||||
# Alternating frame LSB
|
||||
# left = bytes[::2]
|
||||
# right = bytes[1::2]
|
||||
# for z in range(1, 2):
|
||||
# for u in range(z):
|
||||
# txt = ''
|
||||
# for i in range(u, len(left), 2):
|
||||
# if i % 2 == 0:
|
||||
# txt += str(left[i] & 1)
|
||||
# # txt += chr(left[i])
|
||||
# else:
|
||||
# txt += str(right[i] & 1)
|
||||
# # txt += chr(right[i])
|
||||
# # print(txt)
|
||||
# print(bin_to_text(txt))
|
||||
|
||||
|
||||
analyze_db_peaks('audio_files/761_convergePitch_2.wav', force=False)
|
||||
# analyze_lsb('audio_files/761.wav')
|
||||
Reference in New Issue
Block a user