Code:
#usage: python3 solve.py ngram_file_path [--threads N] [--iters N]
"""Joint segmentation + substitution solver for chunked homophonic ciphers.
THE CIPHER
The ciphertext is whitespace-separated groups. Each group encodes 2-3 plaintext
letters Each letter is written as a TOKEN of 1-3 characters drawn from a
separate alphabet per position, so the same string means different things
in different positions ('z' in position 1 is a different symbol than 'z'
in position 2). Case matters ('T' != 't').
Example from cipher2: group "Kzz" splits "K"+"z"+"z",group "FaiT" splits
"F"+"ai"+"T"
Decrypting therefore means solving two coupled unknowns:
segmentation - where to cut each group into tokens (boundaries are not
marked; "FaiT" could be F+ai+T, Fa+i+T, F+a+iT, ...)
substitution - which plaintext letter each (position, token) symbol means
SPACE is a valid plaintext letter (value 26, shown as '_').
Spaces never enter n-gram scoring: they are removed from the stream and the
remaining letters are scored normally with an ordinary 26-letter model (a
window simply reads the next n non-space letters, joining across spaces).
Some groups may carry only two tokens: options include every 2-token reading,
which keeps the 3-slot layout with one GAP slot (value 27, shown as '-') that
encodes no letter at all.
STAGES
1. build_options: enumerate plausible splits per group. Candidate first/last
tokens come from frequent group prefixes/suffixes in the data. A local
search then picks one split per group that minimizes the total number
of distinct tokens ("min-alphabet seed", stored as option 0), and a
second pass admits extra splits whose parts are mostly supported by that
seed. Result: each group gets a short menu of options.
2. build_problem: number everything. Each distinct (position, token) pair
becomes a symbol id; each group becomes a run of "slots" (one plaintext
letter each) in one flat array of m slots; each option becomes a list of
symbol ids, one per slot.
3. load_ngram: read the n-gram table that scores how English a letter
sequence looks (table[index of n letters] = log score).
4. anneal (the search): simulated annealing over the joint state
choice[g] = which option group g currently uses
letter[s] = which plaintext letter symbol s currently means
decoding is then just plain[slot] = letter[symbol in that slot], and the
whole decode is rescored incrementally as moves change it. Three move
types: flip one symbol's letter, swap the letters of two same-position
symbols, or switch a group to another split option. Workers restart from
random states (hot anneal, T0_HOT), then repeatedly KICK their best state
(randomize a small fraction of letters and splits) and repair it with a
COLD anneal (T0_KICK) - reheating a near-solution would destroy it. Each
pass ends with a greedy polish that accepts any single improving move.
5. main: run workers in parallel processes, print progress every 5 s, report
the overall best and write the decode + recovered key to joint_out.txt.
OBJECTIVE (what "better" means; all terms maintained in O(1) per move)
objective = ngram_total * H**ENTW * exp(-LAM*D - MU*V)
ngram_total summed n-gram log score of the decode with spaces removed
H entropy of the letter distribution (kills "thethethe" collapse)
D distinct active symbols (description-length penalty)
V same-position letter collisions (real key columns ~ bijections)
#spaces how many slots decode to space (keeps spaces from spreading)
STRUCTURES
Problem (build_problem): opts[g][o] = option o of group g as symbol ids,
base[g]/K[g] = the group's slot range, sympos[s]/symtok[s] = symbol metadata.
State: the current assignment (choice, letter), the decoded text (plain),
occ[s] = which slots each symbol currently occupies, and running aggregates
(n-gram total, letter counts, D, V) so every move is evaluated incrementally
and can be reverted exactly.
"""
import argparse
import math
import multiprocessing
import os
import random
import sys
import time
from array import array
from collections import Counter
from types import SimpleNamespace
ENTW = 1.0 # entropy exponent
LAM = 0.01 # weight of the active-symbol count D
MU = 0.05 # weight of the collision count V
T0_HOT = 8000.0 # start temperature for fresh random restarts
T0_KICK = 150.0 # repair temperature after a kick: must stay <300
TEND = 1.0 # end temperature (geometric schedule)
KICK = 0.05 # fraction of letters/splits randomized when kicking the incumbent
STALE_MAX = 8 # kicked rounds without improvement before a fresh random restart
RESTART_ITERS = 300_000 # annealing iterations per restart
P_SPLIT = 0.2 # fraction of moves that re-segment a group
P_SWAP = 0.3 # fraction of moves that swap two same-position letters
MINF = 3 # min frequency for a data-driven affix to enter the token pools
#Nablator's:
CIPHERTEXT = "tiiaoean aoooan yiielioa fiao qaaaenuy tuiib taaoool tiiioa teal qeioa nuegf temouy bumean hib qiaiean \
yiinn buelben yepfitioa tiimean qeioa qeal qeoan tiiaoean taparioa teaii tuellen qeemiunh qeunh taaooq lieliea aiiyun \
temean qaouy fvameean tafitean hiq qial tet qiyonuy liaeiinn fviiean aminn fvt fvegn tuei qiel yepaoioa fiao lim liii \
yepfitioa tiimean qeinn buii daiiean qeb bufitar qiel hiaaeq fvai tiiemi tiiaoean aaiial taegyun aameioa gamean fvelq \
kib fvelq tufityun aaaenuy hipen tiiyon liiiq liaei buelal nuyopen liii lielben liaeial tiiaiipen aaooq aeial yiiinn \
buooean aaeeean fvel fiemino qoemipen fvai liel limal nuyo hioouy lieliea aminn fvioa tiiaaen liaaenuy afitean qeaaeioa \
teiial qeaeiean aminn fvoan taaooiea lielpen aaaef tiiaooan taiiinn liema buelben aemaf taemaoan teaii qiame tiiaoean \
aeial qeopen fifitioa qom bufitean aaiean daaeq fvai uaaeouy liemif teinn yiiemaoan tiiaoean teaiipen aaoal uaii \
daiiean qepen tuamiiea fviiq fvai lielb nuoben lieliea aminn bum lim liii buel qiminn fvfitunh liiiean aonuy taaiib \
qifitben fvai teaeiouy liemaq aameoan taal aelal tiioouy bueg nuaaenuy koal kaii qifit fieminuy temouy tuaeiq fvai \
nuaaenuy koal kaii buel tuelouy fvaaeben aaeioan hiaii fiyool kiaiiouy aemiouy aemiq kiaiiouy aameoan taei qiame \
fifityun qomoan kfital qoaoyun aemiouy aiiioa qaeman yiial aeiean buelioa qeamin fviipen aaooan bux"
#Byatan's
CIPHERTEXT = "FaiT Kzz nmnuy maig qar Tiooiea bar nmaei Tmz qooiea Tiaeean bar nmaei Teib Tiiiiz Tieir Kimb mvg Kaiin \
Fvg Tiiig Kieiiea kiin Fmq Fiiiiz mayun Taiyun qiiiaT mmq TIeig faaig Fiaean Kaaeaei Faiyun Foonuy Tiavinn Tiaeaei Tiiiig \
FaiT Kzz KaEg qiimiea mool mail maiiea Tiiiig Kiiag Kieiiea fieeean Tiig Kaiiea naaiben ner naek qeK niinuy mmiea TIayun \
nat Fooix famean fvT Kiiiyun Kaaaouy miig mmn tauYnh Fioog KaeiT kiaq meaei Tiavinn Tienuy Faiyun Foog fiin FvT Tiaaouy \
qayun Tiaean Kamean Fiin Faaei mael Fir Tian Fooyun Fiiiyun Faiyun FooT fiiir Tian Fooyun Fiiiyun Tiooz fieik faoob Kaeyun \
mmr teaei Tiaeean Fml mmq Fiaq meaei Tiavinn Tiiin Fooiea Tiaeean bar nmaei Teib miiiz fiiig Kaiin Faq Fmyun Kier mvr faaiaei \
Fmyun Taeaei Kamg Tieeg Faiyun nat Fooinuy mail Tiiiig qeK niinuy naeyun naz miiit Fooiouy Kianuy Kaiin Feig maaean TiiuYnh \
faoyun nanuy qiieiyun Fzl Fiag fiinuy faaiz Kaayun faob mmn Taig Kaayun Tieaei Keiyun nmyun naaean Fiig fiaean mvq qiimn maiK \
Fil meq qiiipen Kieaei Kiin Feez Kimz Kaal faainuy Fmean Fiit Fooiq Teiyun KaiT Kaiiean Tmpen Tial Taiz faaz fimg mail faaiiea \
Tiiiig Kieig TiiiT fiiinuy Tiiiiz Tieeg Kiar nmaei Kal kaeean Fmr nzg Kaeiz Kayun naean Fmg TaaT fiiinuy Timyun Taeaei nmnuy \
Tiaet fiaeben mail TIeig Tiiiig Fiiiz miinuy Fiiyun TaiK Kaab Fiviea qiial mooiyun nayun Kimaei KazT TiiiT fiiinuy Timyun \
Taeaei Tiaenuy TImr Feinuy kaaq qooil Fmean qiiiayun mzg Kaeiz Ka"
# ---------------------------------------------------------------- option gen
def all_splits(group, k):
"""Every way to cut `group` into k tokens of width 1-3."""
if k == 0:
return [()] if not group else []
return [
(group[:w],) + rest
for w in (1, 2, 3)
if w <= len(group)
for rest in all_splits(group[w:], k - 1)
]
def splits_2_or_3(group, k, keep=lambda t: True):
"""Ways to read `group` as up to k tokens: every k-token split, plus every
(k-1)-token split (some groups simply carry one token fewer). A short
reading keeps k slots with one GAP slot ("" = no letter) at slot 1..k-1 so
the slot layout stays fixed. `keep` filters the candidate tuples."""
opts = [t for t in all_splits(group, k) if keep(t)]
for sub in all_splits(group, k - 1):
for j in range(1, k):
t = sub[:j] + ("",) + sub[j:]
if t not in opts and keep(t):
opts.append(t)
return opts
def build_options(cipher_text):
"""
A min-alphabet local search picks each group's option 0 (seed segmentation);
round B then admits extra splits where >=2 of the 3 parts are already supported.
"""
groups = cipher_text.split()
pre_cnt, suf_cnt = Counter(), Counter()
for g in groups:
for w in (1, 2, 3):
if len(g) >= w + 2:
pre_cnt[g[:w]] += 1
suf_cnt[g[-w:]] += 1
pre = {t for t, c in pre_cnt.items() if c >= MINF}
suf = {t for t, c in suf_cnt.items() if c >= MINF}
group_opts = []
for g in groups:
opts = splits_2_or_3(
g, 3, lambda t: t[0] in pre and (t[2] in suf or not t[2]))
if not opts: # pools miss this group: allow any split
opts = splits_2_or_3(g, 3)
group_opts.append(opts)
# min-alphabet seed: coordinate descent on total distinct tokens per column
def cost(choice):
"""Total number of distinct tokens across the three positions."""
return sum(len({t[i] for t in choice}) for i in range(3))
best, rng = None, random.Random(1)
for _ in range(40):
choice = [rng.choice(o) for o in group_opts]
improved = True
while improved:
improved = False
for i, opts in enumerate(group_opts):
cur = cost(choice)
for o in opts:
old, choice[i] = choice[i], o
if cost(choice) < cur:
cur, improved = cost(choice), True
else:
choice[i] = old
if best is None or cost(choice) < cost(best):
best = list(choice)
# round B: admit splits with >=2 of 3 parts supported by the seed alphabets
seed_alpha = [{t[i] for t in best} for i in range(3)]
s0, s1, s2 = seed_alpha[0] | pre, seed_alpha[1], seed_alpha[2] | suf
for gi, g in enumerate(groups):
have = set(group_opts[gi])
for t in splits_2_or_3(g, 3):
if t not in have and (t[0] in s0) + (t[1] in s1) + (t[2] in s2) >= 2:
group_opts[gi].append(t)
group_opts[gi].sort(key=lambda t: t != best[gi]) # seed becomes option 0
return group_opts
def build_problem(group_opts):
"""Number everything: (position, token) -> symbol id, groups -> slot ranges."""
ids = {}
def sid(pos, tok):
"""Stable symbol id for a (position, token) pair, minted on first use."""
return ids.setdefault((pos, tok), len(ids))
base, K, opts, b = [], [], [], 0
for gopts in group_opts:
k = len(gopts[0])
base.append(b)
K.append(k)
b += k
opts.append([[sid(p, tok) for p, tok in enumerate(t)] for t in gopts])
sympos = [pos for pos, _ in ids]
symtok = [tok for _, tok in ids]
isgap = [tok == "" for _, tok in ids] # GAP slots of 2-token readings
multi = [g for g, o in enumerate(opts) if len(o) > 1]
return SimpleNamespace(
nsym=len(ids),
m=b,
base=base,
K=K,
opts=opts,
multi=multi,
sympos=sympos,
symtok=symtok,
isgap=isgap,
)
# ---------------------------------------------------------------- ngram model
def load_ngram(path):
"""Load an AZdecrypt-style n-gram table; returns (n, A, table) where A is
the model's alphabet size -- 27 if it scores SPACE as symbol 26 -- and
table is indexed by the base-A big-endian value of an n-symbol window."""
data = open(path, "rb").read()
for A in (26, 27): # binary format: raw A^n uint8 table
for n in range(3, 9):
if len(data) == A**n:
return n, A, data
# text format: records of <n symbols><3-char score> (symbols A-Z or space).
# Find n by alignment: the last score char of every record is a digit.
for n in range(3, 9):
rec = n + 3
if all(48 <= data[off + n + 2] <= 57
for off in range(0, min(len(data) - rec, 50 * rec) + 1, rec)):
break
rec = n + 3
end = len(data) - rec + 1
A = 27 if any(32 in data[off : off + n] for off in range(0, end, rec)) else 26
table = array("h", bytes(2 * A**n))
for off in range(0, end, rec):
idx = 0
for c in data[off : off + n]:
if c == 32:
c = 91 # treat space as the symbol after 'Z' (index 26)
if not 65 <= c <= 90 + (A == 27):
idx = -1
break
idx = idx * A + c - 65
if idx >= 0:
table[idx] = int(data[off + n : off + rec])
return n, A, table
# ---------------------------------------------------------------- search state
class State:
"""Current assignment plus running aggregates, so obj() is O(1) and every
move updates only the n-gram windows it touches."""
def __init__(self, P, n, A, table):
"""Allocate all per-slot/per-symbol arrays; rebuild() must follow."""
self.P, self.n, self.A, self.table = P, n, A, table
self.letter = [0] * P.nsym # symbol -> plaintext letter 0-25
self.choice = [0] * len(P.opts) # group -> current option index
self.cursym = [0] * P.m # slot -> symbol occupying it
self.plain = bytearray(P.m) # slot -> decoded letter
self.occ = [[] for _ in range(P.nsym)] # symbol -> its active slots
self.posidx = [0] * P.m # slot -> index in occ[] (O(1) removal)
self.scnt = [0] * P.nsym # symbol -> active occurrence count
self.seen = [0] * P.m # window-dedup stamps for collect()
self.chg = [0] * P.m # changed-slot stamps for collect()
self.stamp = self.cstamp = 0
self.logm = math.log(P.m)
# --- aggregate bookkeeping -------------------------------------------
def add_act(self, pos, lt):
"""Count a new active symbol with letter lt in position pos (updates V)."""
self.V += self.nact[pos][lt] >= 1
self.nact[pos][lt] += 1
def del_act(self, pos, lt):
"""Remove an active symbol with letter lt from position pos (updates V)."""
self.nact[pos][lt] -= 1
self.V -= self.nact[pos][lt] >= 1
def occ_add(self, s, slot):
"""Mark symbol s active in slot; maintains occ/scnt/D and the V buckets."""
self.posidx[slot] = len(self.occ[s])
self.occ[s].append(slot)
self.scnt[s] += 1
if self.scnt[s] == 1:
self.D += 1
self.add_act(self.P.sympos[s], self.letter[s])
def occ_del(self, s, slot):
"""Inverse of occ_add: swap-pop the slot out of symbol s's active list."""
i, last = self.posidx[slot], self.occ[s][-1]
self.occ[s][i] = last
self.posidx[last] = i
self.occ[s].pop()
self.scnt[s] -= 1
if self.scnt[s] == 0:
self.D -= 1
self.del_act(self.P.sympos[s], self.letter[s])
@staticmethod
def xlx(c):
"""c*ln(c) with the 0*ln(0)=0 convention (entropy bookkeeping unit)."""
return c * math.log(c) if c > 0 else 0.0
def entropy(self):
"""Shannon entropy of the decoded letters: H = ln(m) - esum/m."""
return max(self.logm - self.esum / self.P.m, 1e-9)
def obj(self):
"""The objective: ngram * H**ENTW * exp(-LAM*D - MU*V )"""
return self.ng * self.entropy() ** ENTW * math.exp(
-LAM * self.D - MU * self.V)
def score_windows(self, starts):
"""Summed n-gram score of the windows STARTING at the given slots.
Spaces are removed from scoring: a window starts at a non-space slot
and reads the next n non-space letters, joining across spaces. Starts
that are spaces, or windows that run off the end, score 0. ng_total is
exactly this summed over every slot."""
plain, table, n, base = self.plain, self.table, self.n, self.A
m, total = self.P.m, 0
for st in starts:
if plain[st] >= 26: # spaces and gaps never start or join a window
continue
idx, got, j = 0, 0, st
while j < m:
c = plain[j]
if c < 26:
idx = idx * base + c
got += 1
if got == n:
total += table[idx]
break
j += 1
return total
def rebuild(self, rng, rand_letters, rand_choice):
"""Full re-init of all aggregates (once per restart); moves are incremental."""
P = self.P
self.occ = [[] for _ in range(P.nsym)]
self.scnt = [0] * P.nsym
self.D = self.V = 0
self.nact = [[0] * 28 for _ in range(3)]
if rand_letters:
self.letter = [rng.randrange(27) for _ in range(P.nsym)]
for s, gp in enumerate(P.isgap): # GAP slots always decode to no letter
if gp:
self.letter[s] = 27
for g in range(len(P.opts)):
if rand_choice:
self.choice[g] = rng.randrange(len(P.opts[g]))
for j, s in enumerate(P.opts[g][self.choice[g]]):
self.cursym[P.base[g] + j] = s
self.occ_add(s, P.base[g] + j)
for i in range(P.m):
self.plain[i] = self.letter[self.cursym[i]]
self.cnt = [0] * 28 # letter counts; 26 = spaces, 27 = gap slots
for c in self.plain:
self.cnt[c] += 1
self.esum = sum(self.xlx(c) for c in self.cnt)
self.ng = self.score_windows(range(P.m))
def collect(self, slots):
"""Deduped candidate window starts: every slot whose window could span
one of the changed `slots`, valid both before AND after the move. The
backward walk counts n-1 non-space slots but never counts a changed
slot (its space-status may flip), so the reach is always sufficient."""
self.cstamp += 1
cstamp, chg = self.cstamp, self.chg
for i in slots:
chg[i] = cstamp
self.stamp += 1
stamp, seen, plain, n = self.stamp, self.seen, self.plain, self.n
starts = []
for i in slots:
j, cnt = i, 0
while True:
if seen[j] != stamp:
seen[j] = stamp
starts.append(j)
if j == 0 or cnt >= n - 1:
break
j -= 1
if plain[j] < 26 and chg[j] != cstamp:
cnt += 1
return starts
# --- letter flip -------------------------------------------------------
def flip_letter(self, s, v):
"""Remap symbol s to letter v; returns the objective delta."""
o0 = self.obj()
self._lsave = (s, self.letter[s], self.esum)
starts = self.collect(self.occ[s])
old = self.score_windows(starts)
for i in self.occ[s]:
self.plain[i] = v
self._lngd = self.score_windows(starts) - old
self.ng += self._lngd
lold, c, cnt = self.letter[s], self.scnt[s], self.cnt
self.esum -= self.xlx(cnt[lold]) + self.xlx(cnt[v])
cnt[lold] -= c
cnt[v] += c
self.esum += self.xlx(cnt[lold]) + self.xlx(cnt[v])
self.letter[s] = v
if c > 0:
self.del_act(self.P.sympos[s], lold)
self.add_act(self.P.sympos[s], v)
return self.obj() - o0
def revert_letter(self):
"""Undo the most recent flip_letter exactly."""
s, lold, esum = self._lsave
v, c = self.letter[s], self.scnt[s]
for i in self.occ[s]:
self.plain[i] = lold
self.cnt[v] -= c
self.cnt[lold] += c
self.esum = esum
self.letter[s] = lold
self.ng -= self._lngd
if c > 0:
self.del_act(self.P.sympos[s], v)
self.add_act(self.P.sympos[s], lold)
# --- split flip --------------------------------------------------------
def flip_split(self, g, o):
"""Re-segment group g to option o; returns the objective delta."""
o0 = self.obj()
self._ssave = (g, self.choice[g], self.esum)
b, k = self.P.base[g], self.P.K[g]
starts = self.collect(range(b, b + k))
old = self.score_windows(starts)
self.apply_split(g, o)
self._sngd = self.score_windows(starts) - old
self.ng += self._sngd
return self.obj() - o0
def apply_split(self, g, o):
"""Hand group g's slots from the current option's symbols to option o's."""
b = self.P.base[g]
oldo, newo = self.P.opts[g][self.choice[g]], self.P.opts[g][o]
for j, (s0, s1) in enumerate(zip(oldo, newo)):
if s0 == s1:
continue
self.occ_del(s0, b + j)
self.occ_add(s1, b + j)
a, c = self.letter[s0], self.letter[s1]
if a != c:
self.esum -= self.xlx(self.cnt[a]) + self.xlx(self.cnt[c])
self.cnt[a] -= 1
self.cnt[c] += 1
self.esum += self.xlx(self.cnt[a]) + self.xlx(self.cnt[c])
self.cursym[b + j] = s1
self.plain[b + j] = c
self.choice[g] = o
def revert_split(self):
"""Undo the most recent flip_split exactly."""
g, old, esum = self._ssave
self.apply_split(g, old)
self.esum = esum # restore exactly (apply re-derives, may drift)
self.ng -= self._sngd
def to_text(plain):
"""Decoded letter values to display: '_' = space (26), '-' = gap (27)."""
return "".join("_-"[c - 26] if c >= 26 else chr(97 + c) for c in plain)
# ---------------------------------------------------------------- annealing
# inherited by forked workers; BEST_* are shared memory for progress reporting
PROB, NGRAM_N, NGRAM_A, NGRAM_TABLE = None, None, None, None
BEST_OBJ, BEST_PLAIN = None, None
def anneal(args):
"""Iterated local search until the iteration budget runs out. Fresh random
restarts (hot anneal) find candidate basins; then each round kicks the
incumbent best -- randomizing a KICK fraction of letters and splits -- and
repairs it with a COLD anneal, ratcheting toward the optimum. After
STALE_MAX kicks without improvement, start fresh again. Returns the
worker's best (obj, letters, choices)."""
tid, budget = args
P = PROB
rng = random.Random(0x9E3779B97F4A7C15 * (tid + 1))
S = State(P, NGRAM_N, NGRAM_A, NGRAM_TABLE)
best = (-math.inf, None, None)
incumbent, stale, first = None, 0, True
while budget > 0:
iters = min(RESTART_ITERS, budget)
budget -= iters
if incumbent is None or stale >= STALE_MAX:
# fresh start (worker 0's first restart keeps the seed segmentation)
S.rebuild(rng, rand_letters=True, rand_choice=not (first and tid == 0))
incumbent, stale, t0 = None, 0, T0_HOT
else:
# kick the incumbent and repair coldly
S.letter = [l if rng.random() > KICK else rng.randrange(27)
for l in incumbent[1]]
S.choice = list(incumbent[2])
for g in P.multi:
if rng.random() < KICK:
S.choice[g] = rng.randrange(len(P.opts[g]))
S.rebuild(rng, rand_letters=False, rand_choice=False)
t0 = T0_KICK
first = False
run_restart(S, rng, iters, t0)
ob = S.obj()
if incumbent is None or ob > incumbent[0]:
incumbent, stale = (ob, list(S.letter), list(S.choice)), 0
else:
stale += 1
if ob > best[0]:
best = (ob, list(S.letter), list(S.choice))
with BEST_OBJ.get_lock(): # publish for the progress printer
if ob > BEST_OBJ.value:
BEST_OBJ.value = ob
BEST_PLAIN[:] = bytes(S.plain)
return best
def run_restart(S, rng, iters, t0=T0_HOT):
"""One simulated-annealing pass followed by a greedy polish."""
P = S.P
cool = (TEND / t0) ** (1.0 / iters)
T = t0
for _ in range(iters):
T *= cool
mv = rng.random() # split / swap / flip per the P_* mix
if mv < P_SPLIT and P.multi:
g = P.multi[rng.randrange(len(P.multi))]
o = rng.randrange(len(P.opts[g]))
if o == S.choice[g]:
continue
d = S.flip_split(g, o)
if d < 0 and math.exp(d / T) < rng.random():
S.revert_split()
elif mv < P_SPLIT + P_SWAP:
# swap letters of two same-position symbols (picked via random
# slots so frequent symbols move more); keeps V unchanged
s1, s2 = S.cursym[rng.randrange(P.m)], S.cursym[rng.randrange(P.m)]
if s1 == s2 or P.sympos[s1] != P.sympos[s2]:
continue
if P.isgap[s1] or P.isgap[s2]:
continue
a, b = S.letter[s1], S.letter[s2]
if a == b:
continue
d = S.flip_letter(s1, b) + S.flip_letter(s2, a)
if d < 0 and math.exp(d / T) < rng.random():
S.flip_letter(s2, b)
S.flip_letter(s1, a)
else:
s = S.cursym[rng.randrange(P.m)]
if P.isgap[s]:
continue
v = rng.randrange(27)
if v == S.letter[s]:
continue
d = S.flip_letter(s, v)
if d < 0 and math.exp(d / T) < rng.random():
S.revert_letter()
# greedy polish at T=0: accept any improving flip / swap / split
improved, rounds = True, 0
while improved and rounds < 6:
improved, rounds = False, rounds + 1
for s in range(P.nsym):
if S.scnt[s] and not P.isgap[s]:
for v in range(27):
if v != S.letter[s]:
if S.flip_letter(s, v) > 0:
improved = True
else:
S.revert_letter()
for s1 in range(P.nsym):
if not S.scnt[s1] or P.isgap[s1]:
continue
for s2 in range(s1 + 1, P.nsym):
if not S.scnt[s2] or P.sympos[s1] != P.sympos[s2] or P.isgap[s2]:
continue
a, b = S.letter[s1], S.letter[s2]
if a == b:
continue
if S.flip_letter(s1, b) + S.flip_letter(s2, a) > 0:
improved = True
else:
S.flip_letter(s2, b)
S.flip_letter(s1, a)
for g in P.multi:
for o in range(len(P.opts[g])):
if o != S.choice[g]:
if S.flip_split(g, o) > 0:
improved = True
else:
S.revert_split()
def main():
"""Parse args, build the problem, run the workers, report the best result."""
ap = argparse.ArgumentParser(description=__doc__.split("\n")[0])
ap.add_argument("ngram")
ap.add_argument("--threads", type=int, default=os.cpu_count())
ap.add_argument(
"--iters",
type=int,
default=10_000_000,
help="total annealing iterations per thread "
f"(split into restarts of {RESTART_ITERS})",
)
args = ap.parse_args()
global PROB, NGRAM_N, NGRAM_A, NGRAM_TABLE, BEST_OBJ, BEST_PLAIN
PROB = build_problem(build_options(CIPHERTEXT))
NGRAM_N, NGRAM_A, NGRAM_TABLE = load_ngram(args.ngram)
print(
f"m={PROB.m} nsym={PROB.nsym} groups={len(PROB.opts)} "
f"multi={len(PROB.multi)} ngram n={NGRAM_N} alphabet={NGRAM_A}",
file=sys.stderr,
)
# fork so workers inherit the problem + ngram table; shared best for progress
ctx = multiprocessing.get_context("fork")
BEST_OBJ = ctx.Value("d", -math.inf)
BEST_PLAIN = ctx.Array("B", PROB.m)
start = time.time()
with ctx.Pool(args.threads) as pool:
pending = pool.map_async(anneal, [(t, args.iters) for t in range(args.threads)])
while not pending.ready(): # print best found every 5 seconds
pending.wait(5)
with BEST_OBJ.get_lock():
ob, plain = BEST_OBJ.value, bytes(BEST_PLAIN[:])
if ob > -math.inf:
text = to_text(plain)
print(
f"[{time.time() - start:5.0f}s] obj={ob:8.0f} {text[:90]}...",
file=sys.stderr,
)
results = pending.get()
best_obj, best_letter, best_choice = max(results)
# reconstruct the winner for reporting
S = State(PROB, NGRAM_N, NGRAM_A, NGRAM_TABLE)
S.letter, S.choice = best_letter, list(best_choice)
S.rebuild(random.Random(1), rand_letters=False, rand_choice=False)
decoded = to_text(S.plain)
print(
f"best obj={best_obj:8.0f} ng={S.ng} D={S.D} V={S.V} H={S.entropy():.4f} "
f"avg={S.ng / (PROB.m - NGRAM_N + 1):.2f}"
)
print(f"decoded({PROB.m}): {decoded}")
with open("joint_out.txt", "w") as f:
f.write(decoded + "\n--- key (active symbols) ---\n")
for s in range(PROB.nsym):
if S.scnt[s]:
f.write(
f"pos{PROB.sympos[s]} {PROB.symtok[s]:<4} -> "
f"{to_text([best_letter[s]])} (x{S.scnt[s]})\n"
)
f.write("--- splits ---\n")
for g, o in enumerate(best_choice):
toks = " ".join(PROB.symtok[s] for s in PROB.opts[g][o])
f.write(f"{o} {toks}\n")
if __name__ == "__main__":
main()