Fork me on GitHub

文本相似度计算

本文介绍文本相似度计算的各种方法,可以广泛应用在基于问答对匹配的问答系统中。

TF-IDF

$$
tfidf_i = tfidf = \frac{词i的数量}{词语总数}log\frac{总文档数}{包含词i的文档数}
$$

其中tf称为词频,idf为逆文档频率。

BM25

$$
BM25(i) = \frac{词i的数量}{总词数}\frac{(k+1)C}{C+k(1-b+b\frac{|d|}{avdl})}log(\frac{总文档数}{包含i的文档数}) \
C = tf=\frac{词i的数量}{总词数},k>0,b\in [0,1],d为文档i的长度,avdl是文档平均长度
$$

把 $1-b+b\frac{d}{avdl}$ 中的b看成0,那么此时中间项的结果为 $\frac{(k+1)tf}{k+tf}$ ,通过设置一个k,就能够保证其最大值为1,达到限制tf过大的目的。

即:
$$
\begin{align}
&\frac{(k+1)tf}{k+tf}= \frac{k+1}{1+\frac{k}{tf}} \qquad \qquad \qquad,上下同除tf
\end{align}
$$
k不变的情况下,上式随着tf的增大而增大,上限为k+1,但是增加的程度会变小。

在一个句子中,某个词重要程度应该是随着词语的数量逐渐衰减的,所以中间项对词频进行了惩罚,随着次数的增加,影响程度的增加会越来越小。通过设置k值,能够保证其最大值为k+1,k往往取值1.2

此外, $1-b+b\frac{d}{avdl}$ 的作用是用来对文本的长度进行归一化。例如在考虑整个句子的tdidf的时候,如果句子的长度太短,那么计算总的tdidf的值是要比长句子的tdidf的值要低的。所以可以考虑对句子的长度进行归一化处理。

可以看到,当句子的长度越短,$1-b+b\frac{|d|}{avdl}$的值是越小,作为分母的位置,会让整个第二项越大,从而达到提高短文本句子的BM25的值的效果。当b的值为0,可以禁用归一化, b往往取值0.75

fasttext

可以使用fasttext获取词向量,然后对一个句子中的所有词语的词向量进行平均,获取整个句子的向量表示,而且通过参数的控制,能实现N-garm的效果。

假设我们有文本a.txt如下:

1
2
3
我 很 喜欢 她
今天 天气 不错
我 爱 深度学习

那么我们可以实现获取句子向量的方法如下:

1
2
3
4
5
6
import fasttext.FastText as fasttext
#训练模型,设置n-garm=2
model = fasttext.train_unsupervised(input="a.txt", epoch=20, minCount=1, wordNgrams=2)
#获取句子向量,是对词向量的平均
print(model.get_sentence_vector("我 是 谁"))

默认生成的句向量是100维的。

pysparnn

pysparnn 使用的是一种 cluster pruning(簇修剪) 的技术,开始的时候对数据进行聚类,后续再有限个类别中进行数据的搜索,根据计算的余弦相似度返回结果。

数据预处理过程如下:

  1. 随机选择 $\sqrt{N}$ 个样本作为leader
  2. 选择非leader的数据(follower),使用余弦相似度计算找到最近的leader

当获取到一个问题q的时候,查询过程:

  1. 计算每个leader和q的相似度,找到最相似的leader
  2. 然后计算问题q和leader所在簇的相似度,找到最相似的k个,作为最终的返回结果

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import pysparnn.cluster_index as ci
from sklearn.feature_extraction.text import TfidfVectorizer
#1. 原始数据
data = [
'hello world',
'oh hello there',
'Play it',
'Play it again Sam',
]
#2. 原始数据向量化
tv = TfidfVectorizer()
tv.fit(data)
features_vec = tv.transform(data)
# 原始数据构造索引
cp = ci.MultiClusterIndex(features_vec, data)
# 待搜索的数据向量化
search_data = [
'oh there',
'Play it again Frank'
]
search_features_vec = tv.transform(search_data)
#3. 索引中传入带搜索数据,返回结果
print(cp.search(search_features_vec, k=2, k_clusters=2, return_distance=True))

此外,还可以设置两个大于0的数字b1和b2

  • b1表示在数据预处理阶段,每个follower选择b1个最相似的leader,而不是选择单独一个leader,这样不同的簇是有数据交叉的
  • b2表示在查询阶段,找到最相似的b2个leader,然后再计算不同的leader中下的topk的结果

通过增加b1和b2的值,我们能够有更大的机会找到更好的结果,但是这样会需要更加大量的计算。

ci.MultiClusterIndex(features, records_data, num_indexes) 中, num_indexes 能够设置b1的值,默认为2。

在搜索的过程中, cp.search(search_vec, k=8, k_clusters=10, return_distance=True,num_indexes)num_Indexes 可以设置b2的值,默认等于b1的值。

孪生网络

孪生神经网络由两个共享权值的网络的组成,通过两个输入,被DNN进行编码,得到向量的表示之后,根据实际的用途来制定损失函数。比如我们需要计算相似度的时候,可以使用余弦相似度,或者使用 $exp^{-||h^{left}-h^{right}||}$ 来确定向量的距离。孪生神经网络被用于有多个输入和一个输出的场景,比如手写字体识别、文本相似度检验、人脸识别等。

模型架构大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import torch.nn.functional as F
import torch.nn as nn
import config
import torch
class SiameseNetwork(nn.Module):
def __init__(self):
super(SiameseNetwork, self).__init__()
self.embedding = nn.Embedding(num_embeddings=len(config.sort_ws),
embedding_dim=300,
padding_idx=config.sort_ws.PAD)
self.gru1 = nn.GRU(input_size=300,
hidden_size=config.sort_hidden_size,
num_layers=config.sort_num_layers,
bidirectional=config.bidirectional,
batch_first=True)
self.gru2 = nn.GRU(input_size=config.sort_hidden_size * 8,
hidden_size=config.sort_hidden_size,
num_layers=1,
batch_first=True,
bidirectional=False)
self.fc = nn.Sequential(
nn.BatchNorm1d(config.sort_hidden_size * 2),
nn.Linear(config.sort_hidden_size * 2, config.sort_hidden_size),
nn.ELU(inplace=True),
nn.BatchNorm1d(config.sort_hidden_size),
nn.Dropout(config.sort_dropout),
nn.Linear(config.sort_hidden_size, config.sort_hidden_size),
nn.ELU(inplace=True),
nn.BatchNorm1d(config.sort_hidden_size),
nn.Dropout(config.sort_dropout),
nn.Linear(config.sort_hidden_size, 2),
)
def forward(self, input1, input2):
mask1, mask2 = input1.eq(config.sort_ws.PAD), input2.eq(config.sort_ws.PAD)
input1 = self.embedding(input1) # [batch_size, max_len, 300]
input2 = self.embedding(input2)
# output [batch_size, max_len, hidden_size*num_layer]
# hidden [numlayer*2, batch_size, hidden_size]
output1, _ = self.gru1(input1)
output2, _ = self.gru1(input2)
output1_align, output2_align = self.sort_attention_align(output1, output2, mask1, mask2)
q1_combined = torch.cat([output1, output1_align, self.submul(output1, output1_align)], dim=-1) # [batch_size, max_len, hidden_size*8]
q2_combined = torch.cat([output2, output2_align, self.submul(output2, output2_align)], dim=-1)
# batch_size * seq_len * (1 * hidden_size)
q1_compose, _ = self.gru2(q1_combined)
q2_compose, _ = self.gru2(q2_combined)
# 进行Aggregate操作,也就是进行pooling
# input: batch_size * seq_len * (1 * hidden_size)
# output: batch_size * (1 * hidden_size)
q1_rep = self.apply_pooling(q1_compose)
q2_rep = self.apply_pooling(q2_compose)
# Concate合并到一起,用来进行计算相似度
out = torch.cat([q1_rep, q2_rep], dim=-1) # batch_size * (2 * hidden_size)
out = self.fc(out) # batch_size * 2
return out
def submul(self, x1, x2):
mul = x1 * x2
sub = x1 - x2
return torch.cat([sub, mul], dim=-1)
def apply_pooling(self, output):
avg_pooled = F.avg_pool1d(output.transpose(1, 2), kernel_size=output.size(1)).squeeze(-1)
max_pooled = F.max_pool1d(output.transpose(1, 2), kernel_size=output.size(1)).squeeze(-1)
return avg_pooled + max_pooled
def sort_attention_align(self, x1, x2, mask1, mask2):
'''
x1: batch_size * seq_len_1 * hidden_size
x2: batch_size * seq_len_2 * hidden_size
mask1:x1中pad的位置为1,其他为0
mask2:x2中pad的位置为1,其他为0
'''
# attention: batch_size * seq_len_1 * seq_len_2
attention_weight = torch.matmul(x1, x2.transpose(1, 2))
# mask1 : batch_size,seq_len1
mask1 = mask1.float().masked_fill_(mask1, float('-inf'))
# mask2 : batch_size,seq_len2
mask2 = mask2.float().masked_fill_(mask2, float('-inf'))
# weight: batch_size * seq_len_1 * seq_len_2
weight1 = F.softmax(attention_weight + mask2.unsqueeze(1), dim=-1)
# batch_size*seq_len_1*hidden_size
x1_align = torch.matmul(weight1, x2)
# 同理,需要对attention_weight进行permute操作
weight2 = F.softmax(attention_weight.transpose(1, 2) + mask1.unsqueeze(1), dim=-1)
x2_align = torch.matmul(weight2, x1)
return x1_align, x2_align

BERT

也可以使用BERT进行文本相似度计算,这里使用 https://github.com/IAdmireu/ChineseSTS/blob/master/simtrain_to05sts.txt 数据集,两个句子的相似度范围从0到5。然后就和上篇文章中的方法一样,转化为文本分类问题就ok了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import torch
import time
import torch.nn as nn
import torch.nn.functional as F
from transformers import BertModel, BertTokenizer
import pandas as pd
import numpy as np
from tqdm import tqdm
from torch.utils.data import *
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
path = "./"
bert_path = "hfl/chinese-roberta-wwm-ext"
tokenizer = BertTokenizer(vocab_file="vocab.txt") # 初始化分词器
input_ids = [] # input char ids
input_types = [] # segment ids
input_masks = [] # attention mask
label = [] # 标签
pad_size = 64 # 也称为 max_len
with open(path + "train.txt", 'r', encoding='utf-8') as f:
for i, l in tqdm(enumerate(f)):
x1, x2, y = l.strip().split('\t')
x1 = tokenizer.tokenize(x1)
x2 = tokenizer.tokenize(x2)
tokens = ["[CLS]"] + x1 + ["[SEP]"] + x2 + ["[SEP]"]
# 得到input_id, seg_id, att_mask
ids = tokenizer.convert_tokens_to_ids(tokens)
types = [0] * len(ids)
masks = [1] * len(ids)
# 短则补齐,长则切断
if len(ids) < pad_size:
types = types + [1] * (pad_size - len(ids)) # mask部分 segment置为1
masks = masks + [0] * (pad_size - len(ids))
ids = ids + [0] * (pad_size - len(ids))
else:
types = types[:pad_size]
masks = masks[:pad_size]
ids = ids[:pad_size]
input_ids.append(ids)
input_types.append(types)
input_masks.append(masks)
assert len(ids) == len(masks) == len(types) == pad_size
label.append([int(float(y))])
# 随机打乱索引
random_order = list(range(len(input_ids)))
np.random.seed(2020) # 固定种子
np.random.shuffle(random_order)
# 4:1 划分训练集和测试集
input_ids_train = np.array([input_ids[i] for i in random_order[:int(len(input_ids)*0.8)]])
input_types_train = np.array([input_types[i] for i in random_order[:int(len(input_ids)*0.8)]])
input_masks_train = np.array([input_masks[i] for i in random_order[:int(len(input_ids)*0.8)]])
y_train = np.array([label[i] for i in random_order[:int(len(input_ids) * 0.8)]])
print(input_ids_train.shape, input_types_train.shape, input_masks_train.shape, y_train.shape)
input_ids_test = np.array([input_ids[i] for i in random_order[int(len(input_ids)*0.8):]])
input_types_test = np.array([input_types[i] for i in random_order[int(len(input_ids)*0.8):]])
input_masks_test = np.array([input_masks[i] for i in random_order[int(len(input_ids)*0.8):]])
y_test = np.array([label[i] for i in random_order[int(len(input_ids) * 0.8):]])
print(input_ids_test.shape, input_types_test.shape, input_masks_test.shape, y_test.shape)
BATCH_SIZE = 64
train_data = TensorDataset(torch.LongTensor(input_ids_train),
torch.LongTensor(input_types_train),
torch.LongTensor(input_masks_train),
torch.LongTensor(y_train))
train_sampler = RandomSampler(train_data)
train_loader = DataLoader(train_data, sampler=train_sampler, batch_size=BATCH_SIZE)
test_data = TensorDataset(torch.LongTensor(input_ids_test),
torch.LongTensor(input_types_test),
torch.LongTensor(input_masks_test),
torch.LongTensor(y_test))
test_sampler = SequentialSampler(test_data)
test_loader = DataLoader(test_data, sampler=test_sampler, batch_size=BATCH_SIZE)
class Model(nn.Module):
def __init__(self):
super(Model, self).__init__()
self.bert = BertModel.from_pretrained(bert_path) # /bert_pretrain/
for param in self.bert.parameters():
param.requires_grad = True # 每个参数都要 求梯度
self.fc = nn.Linear(768, 6) # 768 -> 6
def forward(self, x): # (ids, seq_len, mask)
context = x[0] # 输入的句子
types = x[1]
mask = x[2] # 对padding部分进行mask,和句子相同size,padding部分用0表示,如:[1, 1, 1, 1, 0, 0]
_, pooled = self.bert(context, token_type_ids=types, attention_mask=mask)
# print(_.shape, pooled.shape) # torch.Size([128, 32, 768]) torch.Size([128, 768])
# print(_[0,0] == pooled[0]) # False 注意是不一样的 pooled再加了一层dense和activation
out = self.fc(pooled) # 得到6分类
return out
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Model().to(DEVICE)
print(model)
# param_optimizer = list(model.named_parameters()) # 模型参数名字列表
# no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
# optimizer_grouped_parameters = [
# {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
# {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}]
optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)
NUM_EPOCHS = 1
def train(model, device, train_loader, optimizer, epoch): # 训练模型
model.train()
best_acc = 0.0
for batch_idx, (x1, x2, x3, y) in enumerate(train_loader):
start_time = time.time()
x1, x2, x3, y = x1.to(device), x2.to(device), x3.to(device), y.to(device)
y_pred = model([x1, x2, x3]) # 得到预测结果
optimizer.zero_grad() # 梯度清零
loss = F.cross_entropy(y_pred, y.squeeze()) # 得到loss
loss.backward()
optimizer.step()
if(batch_idx + 1) % 100 == 0: # 打印loss
print('Train Epoch: {} [{}/{} ({:.2f}%)]\tLoss: {:.6f}'.format(epoch, (batch_idx+1) * len(x1),
len(train_loader.dataset),
100. * batch_idx / len(train_loader),
loss.item())) # 记得为loss.item()
def test(model, device, test_loader): # 测试模型, 得到测试集评估结果
model.eval()
test_loss = 0.0
acc = 0
for batch_idx, (x1, x2, x3, y) in enumerate(test_loader):
x1, x2, x3, y = x1.to(device), x2.to(device), x3.to(device), y.to(device)
with torch.no_grad():
y_ = model([x1,x2,x3])
test_loss += F.cross_entropy(y_, y.squeeze())
pred = y_.max(-1, keepdim=True)[1] # .max(): 2输出,分别为最大值和最大值的index
acc += pred.eq(y.view_as(pred)).sum().item() # 记得加item()
test_loss /= len(test_loader)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)'.format(
test_loss, acc, len(test_loader.dataset),
100. * acc / len(test_loader.dataset)))
return acc / len(test_loader.dataset)
best_acc = 0.0
PATH = 'roberta_model.pth' # 定义模型保存路径
for epoch in range(1, NUM_EPOCHS+1): # 1个epoch
train(model, DEVICE, train_loader, optimizer, epoch)
acc = test(model, DEVICE, test_loader)
if best_acc < acc:
best_acc = acc
torch.save(model.state_dict(), PATH) # 保存最优模型
print("acc is: {:.4f}, best acc is {:.4f}\n".format(acc, best_acc))
model.load_state_dict(torch.load(PATH))
acc = test(model, DEVICE, test_loader)
donate the author