给定两个文本:src与dest,其中dest是根据src经过编辑得到的,编辑操作包括:删除部分单词、添加部分单词、修改部分单词。
我们希望找到dest中的单词与src中单词之间的映射关系。
我们对dest进行断句分词等成立后得到两个序列:NEW, Sn。其中NEW[i]表示dest的第i个单词的内容,Sn[i]表示第i个单词属于dest的第几个句子。
我们对src进行断句分词等处理后,得到两个序列OLD, So。其中OLD[j]表示src的第j个单词的内容,So[i]表示第j个单词属于src的第几个句子。
我们令条件X = (NEW, OLD, Sn, So)
我们定义序列Y,其中j=Y[i]表示NEW[i]与OLD[j]存在映射关系。如果OLD中不存在与NWE[i]匹配的单词,那么NEW[i]=-1。
条件随机场模型是一个条件概率表达式:
P(Y|X) ~ exp{ -total_loss(X,Y) }
total_loss通过以下约束规则进行建模,使得total_loss最小的Y就是我们要求的最优的匹配方案。
经过上面的分析,我们发现我们构建的条件随机场是一个线形链的,损失函数可以分解为关于两个相邻的标签的损失函数的累加,其最优解等价于一个单源最短路径问题。可以采用dijkstra算法与alpha_star算法求最优解。当然还有经典的vertbi算法,vertbi算法本质上是一个动态规划算法,需要计算很多无用的状态,时间复杂度较高,舍弃不用。而dijkstra算法在文本长度较大时也比较慢,最终我采用了alpha_star算法。
from typing import Set, List, Tuple, Dict
from functools import lru_cache
from heapq import heappush, heappop
from collections import Counter
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk import PorterStemmer
from bisect import bisect_left, bisect_right
from fuzzywuzzy import fuzz
class TextAlignment():
def __init__(self, src: List[str], dest: List[str]):
self.src: str = src
self.dest: str =dest
self.old: List[str] = ['<BOS>']
self.new: List[str] = ['<BOS>']
self.So: List[int] = [-1]
self.Sn: List[int] = [-1]
self.preprocess()
self.old.append('<EOS>')
self.new.append('<EOS>')
self.So.append(-1)
self.Sn.append(-1)
def __call__(self)->List[int]:
loss, Y = self.alpha_star()
return self.post_process(Y)
def preprocess(self):
for sid, sent in enumerate(sent_tokenize(self.src)):
for w in word_tokenize(sent):
self.old.append(w)
self.So.append(sid)
for sid, sent in enumerate(sent_tokenize(self.dest)):
for w in word_tokenize(sent):
self.new.append(w)
self.Sn.append(sid)
def alpha_star(self)->Tuple[int, List[int]]:
M,N = len(self.old), len(self.new)
NO_MATCH: int = -1
word_positions: Dict[str, List[int]] = {}
for i,w in enumerate(self.old):
if w not in word_positions:
word_positions[w] = [i]
else:
word_positions[w].append(i)
#计算可以由当前状态衍生出的子状态集。
def next_states(s)->List[Tuple[int, int, int]]:
i,j,k = s
_i = i+1
if _i==N-1: #最后一个'<EOS>'必须与'<EOS>'进行匹配。
return [(_i,M-1,M-1)]
ans: List[Tuple[int, int, int]] = []
#让NEW[_i]=NO_MATCH,表示让NEW[_i]失配,此时有_j=NO_MATCH, _k=k。_k=k表示上一个发生完全匹配的位置没有变更。
ans.append((_i, NO_MATCH, k))
#遍历所有与NEW[_i]匹配的所有位置_j,严格保障_j>k
w: str = self.new[_i]
if w not in word_positions:
return ans
positions: List[int] = word_positions[w]
lo: int = bisect_right(positions, k)
for _j in positions[lo:]:
if _j>k:
if _j-k>1000: break #连续1000个OLD中的单词都没有被匹配上被认为是不合理的
ans.append((_i, _j, _j))
if len(ans)>100: break #进行剪枝,最多衍生出100个子状态
return ans
#估算从状态s到达结束状态需要的最低代价
D: int = 100
@lru_cache(maxsize=None)
def new_counter(i: int)->Counter: #动态规划算法计算NEW[i:N]各个单词的频数
if i>=N:
return Counter()
return new_counter(i+D) + Counter(self.new[i:i+D])
@lru_cache(maxsize=None)
def old_counter(j: int)->Counter: #动态规划算法计算OLD[j:M]各个单词的频数
if j>=M:
return Counter()
return old_counter(j+D) + Counter(self.old[j:j+D])
#强行采用记忆化深搜技术,将大部分的未知状态转换到已知状态上。
@lru_cache(maxsize=None)
def dismatch_count(i: int, j: int)->int:
if i%D or j%D:
if i%D!=0:
i = (i//D+1)*D #放大i,让i向上到达最近的D的整数倍
j = (j//D)*D #缩小j,让j向下到达最近的D的整数倍
#因为dismatch_count函数随i递减随j递增,因此放大i与缩小j之后得到的值比期望的值要小,符合启发函数的不高估要求
return dismatch_count(i,j)
#只有i与j都是D的整数倍时才进行具体的计算
ans = 0
c1 = new_counter(i)
c2 = old_counter(j)
c = c1-c2
for w,n in c.items():
if n>0:
ans += n
return ans
#A*算法使用的启发函数,该函数计算迅速,估算的值对于搜索具有很强的指导作用,可以加速上百倍。
def heuristic(s)->int: #估算出的失配的单词数量*1万作为当前状态必然要承受的风险损失。
#return 0
i,j,k = s
return dismatch_count(i+1,k+1)*1e4 + (N-i)*4000 #(N-i)*4000用于限制回滚到旧状态,尽可能探索最新的状态。
#A*算法的主干流程
OPEN: List[Tuple[int, int, Tuple[int, int, int], Tuple[int, int, int]]] = []
CLOSE: Dict[Tuple[int,int,int], int] = {}
PREDECESSOR: Dict[Tuple[int, int, int], Tuple[int, int, int]] = {}
s = (0,0,0)
heappush(OPEN, (heuristic(s), 0, s,(-1,-1,-1))) #将起始状态以及对应的损失0压入小根堆中
#backtrace根据当前最优状态搜索其前驱状态,最终追踪到起始状态上,从而得到最优路径。
def backtrace(s)->List[int]:
ans: List[int] = []
i,j,k = s
while i>=0:
ans.append(j)
i,j,k = PREDECESSOR[(i,j,k)]
ans.reverse()
return ans
#计算从状态s1=(i,j,k)到达状态s2=(_i,_j,_K)所付出的代价
def weight(s1, s2)->int:
i,j,k = s1
_i,_j,_k = s2
ans: int = 0
if _j==NO_MATCH: #没有发生匹配,将受到极大的惩罚,如果允许,应该尽可能匹配足够多的单词。
ans += 1e4
return ans
if j==NO_MATCH:
return ans
if self.Sn[i]==self.Sn[_i]: #NEW中两个连续的单词位于同一个句子中。
ans += self.So[_j] - self.So[j] #响应地,在OLD中也应该位于同一个句子,偏离越大惩罚越大。
ans += (_j-j-1) #在OLD中对应的两个单词最好也是相邻的
else:
if self.So[_j]==self.So[j]: #NEW中两个单词不在同一个句子中时,映射到OLD中后也不应该在同一个句子里。
ans += 1
return ans
while OPEN:
#print(OPEN)
f,g,s,p = heappop(OPEN)
i,j,k = s
if i==N-1:
PREDECESSOR[s] = p
return g, backtrace(s)
if s not in CLOSE or CLOSE[s]>g:
CLOSE[s] = g
PREDECESSOR[s] = p
for _s in next_states(s):
_g = g+weight(s,_s)
_h = heuristic(_s)
_f = _g+_h
heappush(OPEN, (_f,_g, _s, s))
return -1, []
def post_process(self, Y: List[int])->List[int]:
M: int = len(self.old)
N: int = len(self.new)
#L,R是两个已经确定产生匹配的位置,开区间内部尚未发生匹配,经过该函数后,开区间内的单词之间将产生模糊匹配。
def fuzzy_match(L: Tuple[int,int], R: Tuple[int,int])->List[Tuple[int,int]]:
NO_MATCH: int = -1
ans: List[Tuple[int,int]] = []
CLOSE: Dict[Tuple(int,int,int), int] = {}
PREDECESSOR: Dict[Tuple(int,int,int), Tuple(int,int,int)] = {}
OPEN: List[Tuple(int,int,int,int,int,int,int)] = []
heappush(OPEN, (0, L[0], L[1], L[1], -1,-1,-1))
#计算从当前状态跳转到下一个状态付出的代价
def weight(i,j,k, _i,_j,_k)->int:
ans: int = 0
if _j==NO_MATCH: #没有发生匹配,将受到极大的惩罚,如果允许,应该尽可能匹配足够多的单词。
ans += 10000
else:
ans += 100*(100-fuzz.ratio(self.new[_i], self.old[_j])) #发生匹配的两个单词之间的编辑距离作为损失的一部分。
if j==NO_MATCH:
ans + 10000
if j==NO_MATCH or _j==NO_MATCH:
return ans
if self.Sn[i]==self.Sn[_i]: #NEW中两个连续的单词位于同一个句子中。
ans += self.So[_j] - self.So[j] #响应地,在OLD中也应该位于同一个句子,偏离越大惩罚越大。
ans += (_j-j-1) #在OLD中对应的两个单词最好也是相邻的
else:
if self.So[_j]==self.So[j]: #NEW中两个单词不在同一个句子中时,映射到OLD中后也不应该在同一个句子里。
ans += 1
return ans
def next_states(i,j,k)->List[Tuple[int, int, int]]:
ans: List[Tuple[int,int,int]] = []
_i = i+1
if _i==R[0]:
return [(R[0],R[1],R[1])]
ans.append((_i,NO_MATCH,k))
for _j in range(k+1, R[1]):
if _j-j>50: return ans
ans.append((_i,_j,_j))
return ans
def backtrace(i,j,k)->List[Tuple[int,int]]:
ans: List[Tuple[int,int]] = []
while i>L[0]:
ans.append((i,j))
i,j,k = PREDECESSOR[(i,j,k)]
ans.reverse()
return ans
while OPEN:
#print(OPEN)
loss, i,j,k,_i,_j,_k = heappop(OPEN)
#print(loss,self.new[i],self.old[j] if j>=0 else None)
if i==R[0]:
#PREDECESSOR[(i,j,k)] = (_i,_j,_k)
return backtrace(_i,_j,_k)
if (i,j,k) not in CLOSE or CLOSE[(i,j,k)]>loss:
CLOSE[(i,j,k)] = loss
PREDECESSOR[(i,j,k)] = (_i,_j,_k)
for _i,_j,_k in next_states(i,j,k):
_loss = loss + weight(i,j,k,_i,_j,_k)
#print(loss, _loss, (i,j,k), (_i,_j,_k))
heappush(OPEN, (_loss,_i,_j,_k, i,j,k, ))
return []
pos2pos: List[Tuple[int, int]] = [(i,j) for i,j in enumerate(Y) if j>=0]
new_pos2pos: List[Tuple[int, int]] = list(pos2pos)
for L,R in zip(pos2pos[0:-1], pos2pos[1:]):
new_pos2pos += fuzzy_match(L,R)
new_pos2pos.sort()
return [j for i,j in new_pos2pos]
def display(self, Y: List[str]):
import pandas as pd
pd.set_option('display.max_rows', 30000)
M: int = len(self.old)
A: List[str] = self.old
B: List[str] = ['' for _ in range(M)]
for i,j in enumerate(Y):
B[j] = self.new[i]
df = pd.DataFrame({'OLD':A, 'NEW':B})
print(df)
if __name__=='__main__':
import time
import random
src: str = '''Search the all of the world's informations, including webpages, images, videos and more. Um, Umh, Google has so many special features to help you find exactly no matter what you're looking ...'''
dest: str= '''People searching the world's information, It include webpages, images, Music, videos and other more. It has many very specifically features to help finding exactly what you're looking ...'''
t0=time.time()
text_alignment = TextAlignment(src, dest)
Y = text_alignment()
t1=time.time()
text_alignment.display(Y)
#print(Y)
print(f'time cost={t1-t0}')
#print(Y)执行上面的python文件,输出如下:
OLD NEW
0 <BOS> <BOS>
1 Search Searching
2 the
3 all
4 of
5 the the
6 world world
7 's 's
8 informations information
9 , ,
10 including include
11 webpages webpages
12 , ,
13 images images
14 , ,
15 videos videos
16 and and
17 more more
18 . .
19 Um
20 ,
21 Umh
22 ,
23 Google
24 has has
25 so
26 many many
27 special specifically
28 features features
29 to to
30 help help
31 you
32 find finding
33 exactly exactly
34 no
35 matter
36 what what
37 you you
38 're 're
39 looking looking
40 ... ...
41 <EOS> <EOS>
time cost=0.0011327266693115234我们可以看到新文本虽然在旧文本上做了大量的修改,但是NEW中的单词都找到了与之相对应的OLD中的单词。根据对齐结果Y,我们可以很轻松地指出对OLD做了哪些修改。
另外说一下我做文本对齐的初衷是因为最近做AI视频编辑相关的项目,原始的视频经过ASR后可以得到video transcript,video transcript包含有音频中每个单词被播放的起止时间戳信息,我们去掉时间戳信息后得到的是一个原始的文本:old_transcript_text。接下来我们把old_transcript_text输入给openai/chatgpt,让chatgpt对old_transcript_text进行一些编辑操作得到一个更加简洁的new_transcript_text。chatgpt的编辑操作会删除old_transcript_text中的部分填充词、重复的短语、无意义的语气词、纠正ASR识别错误的一些单词,因此得到的new_transcript_text与old_transcript_text之间会存在着不小的差异。我们只有把new_transcript与old_transcript中的单词,才能确定new_transcript中每个单词对应在音频中的起止时间戳,这样才能够准确地进行音视频剪辑操作。因此这个文本对齐算法对于基于transcript的音视频剪辑任务是十分有用的。
最初我使用了dijkstra算法搜索最优匹配方案,但随着OLD与NEW序列长度的增加到10000个单词时,算法耗时已经达到了分钟级。后来采用了alpha_star算法,引入了启发函数,但启发函数本身的计算也十分耗时。于是又引入了启发函数采样以及估算启发函数的下界的技巧,终于将匹配时间控制到了秒级。
后来为了进一步加速,放弃了搜索最优解,转而搜索可用的次优方案,因此在启发函数中引入了一项(n-i)*4000,这一项纯粹是为了加速,用于避免探索非常陈旧的状态,去掉该项启发函数绝对不会高估可以保障搜索到最优解,加入该项虽然起到了加速的作用,但是也引入了匹配错误的风险。经过以上的迭代优化后,最终算法有了一个非常高的性能,在25000单词量级,文本编辑率达到70%的条件下,仍然在1秒以内搜到了最优匹配。
OLD NEW IDX
0 <BOS> <BOS> 0
1 Introduction Introduction 1
2 Sam Sam_XYZ 2
3 Altman
4 ( ( 3
... ... ... ...
25384 you
25385 next
25386 time time 12748
25387 . ._XYZ 12749
25388 <EOS> <EOS> 12750
[25389 rows x 3 columns]
time cost=0.9011058807373047