Fork me on GitHub

BERT文本分类

使用Huggingface中预训练的BERT模型进行文本分类。

本文使用的是RoBERTa-wwm-ext,模型导入方式参见https://github.com/ymcui/Chinese-BERT-wwm。由于做了全词遮罩(Whole Word Masking),效果相较于裸的BERT会有所提升。

数据集使用THUCNews中的train.txt:https://github.com/649453932/Bert-Chinese-Text-Classification-Pytorch/tree/master/THUCNews/data,十分类问题,示例如下:

1
2
3
4
5
6
7
8
9
10
中华女子学院:本科层次仅1专业招男生 3
两天价网站背后重重迷雾:做个网站究竟要多少钱 4
东5环海棠公社230-290平2居准现房98折优惠 1
卡佩罗:告诉你德国脚生猛的原因 不希望英德战踢点球 7
82岁老太为学生做饭扫地44年获授港大荣誉院士 5
记者回访地震中可乐男孩:将受邀赴美国参观 5
冯德伦徐若�隔空传情 默认其是女友 9
传郭晶晶欲落户香港战伦敦奥运 装修别墅当婚房 1
《赤壁OL》攻城战诸侯战硝烟又起 8
“手机钱包”亮相科博会 4

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import torch
import time
import torch.nn as nn
import torch.nn.functional as F
from transformers import BertModel, BertTokenizer
import pandas as pd
import numpy as np
from tqdm import tqdm
from torch.utils.data import *
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
path = "./"
bert_path = "hfl/chinese-roberta-wwm-ext"
tokenizer = BertTokenizer(vocab_file="vocab.txt") # 初始化分词器
input_ids = [] # input char ids
input_types = [] # segment ids
input_masks = [] # attention mask
label = [] # 标签
pad_size = 32 # 也称为 max_len (前期统计分析,文本长度最大值为38,取32即可覆盖99%)
with open(path + "train.txt", encoding='utf-8') as f:
for i, l in tqdm(enumerate(f)):
x1, y = l.strip().split('\t')
x1 = tokenizer.tokenize(x1)
tokens = ["[CLS]"] + x1 + ["[SEP]"]
# 得到input_id, seg_id, att_mask
ids = tokenizer.convert_tokens_to_ids(tokens)
types = [0] * len(ids)
masks = [1] * len(ids)
# 短则补齐,长则切断
if len(ids) < pad_size:
types = types + [1] * (pad_size - len(ids)) # mask部分 segment置为1
masks = masks + [0] * (pad_size - len(ids))
ids = ids + [0] * (pad_size - len(ids))
else:
types = types[:pad_size]
masks = masks[:pad_size]
ids = ids[:pad_size]
input_ids.append(ids)
input_types.append(types)
input_masks.append(masks)
assert len(ids) == len(masks) == len(types) == pad_size
label.append([int(y)])
# 随机打乱索引
random_order = list(range(len(input_ids)))
np.random.seed(2020) # 固定种子
np.random.shuffle(random_order)
# 4:1 划分训练集和测试集
input_ids_train = np.array([input_ids[i] for i in random_order[:int(len(input_ids)*0.8)]])
input_types_train = np.array([input_types[i] for i in random_order[:int(len(input_ids)*0.8)]])
input_masks_train = np.array([input_masks[i] for i in random_order[:int(len(input_ids)*0.8)]])
y_train = np.array([label[i] for i in random_order[:int(len(input_ids) * 0.8)]])
print(input_ids_train.shape, input_types_train.shape, input_masks_train.shape, y_train.shape)
input_ids_test = np.array([input_ids[i] for i in random_order[int(len(input_ids)*0.8):]])
input_types_test = np.array([input_types[i] for i in random_order[int(len(input_ids)*0.8):]])
input_masks_test = np.array([input_masks[i] for i in random_order[int(len(input_ids)*0.8):]])
y_test = np.array([label[i] for i in random_order[int(len(input_ids) * 0.8):]])
print(input_ids_test.shape, input_types_test.shape, input_masks_test.shape, y_test.shape)
BATCH_SIZE = 128
train_data = TensorDataset(torch.LongTensor(input_ids_train),
torch.LongTensor(input_types_train),
torch.LongTensor(input_masks_train),
torch.LongTensor(y_train))
train_sampler = RandomSampler(train_data)
train_loader = DataLoader(train_data, sampler=train_sampler, batch_size=BATCH_SIZE)
test_data = TensorDataset(torch.LongTensor(input_ids_test),
torch.LongTensor(input_types_test),
torch.LongTensor(input_masks_test),
torch.LongTensor(y_test))
test_sampler = SequentialSampler(test_data)
test_loader = DataLoader(test_data, sampler=test_sampler, batch_size=BATCH_SIZE)
class Model(nn.Module):
def __init__(self):
super(Model, self).__init__()
self.bert = BertModel.from_pretrained(bert_path) # /bert_pretrain/
for param in self.bert.parameters():
param.requires_grad = True # 每个参数都要 求梯度
self.fc = nn.Linear(768, 10) # 768 -> 10
def forward(self, x): # (ids, seq_len, mask)
context = x[0] # 输入的句子
types = x[1]
mask = x[2] # 对padding部分进行mask,和句子相同size,padding部分用0表示,如:[1, 1, 1, 1, 0, 0]
_, pooled = self.bert(context, token_type_ids=types, attention_mask=mask)
# print(_.shape, pooled.shape) # torch.Size([128, 32, 768]) torch.Size([128, 768])
# print(_[0,0] == pooled[0]) # False 注意是不一样的 pooled再加了一层dense和activation
out = self.fc(pooled) # 得到10分类
return out
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Model().to(DEVICE)
print(model)
# param_optimizer = list(model.named_parameters()) # 模型参数名字列表
# no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
# optimizer_grouped_parameters = [
# {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
# {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}]
optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)
NUM_EPOCHS = 3
def train(model, device, train_loader, optimizer, epoch): # 训练模型
model.train()
best_acc = 0.0
for batch_idx, (x1, x2, x3, y) in enumerate(train_loader):
start_time = time.time()
x1, x2, x3, y = x1.to(device), x2.to(device), x3.to(device), y.to(device)
y_pred = model([x1, x2, x3]) # 得到预测结果
optimizer.zero_grad() # 梯度清零
loss = F.cross_entropy(y_pred, y.squeeze()) # 得到loss
loss.backward()
optimizer.step()
if(batch_idx + 1) % 100 == 0: # 打印loss
print('Train Epoch: {} [{}/{} ({:.2f}%)]\tLoss: {:.6f}'.format(epoch, (batch_idx+1) * len(x1),
len(train_loader.dataset),
100. * batch_idx / len(train_loader),
loss.item())) # 记得为loss.item()
def test(model, device, test_loader): # 测试模型, 得到测试集评估结果
model.eval()
test_loss = 0.0
acc = 0
for batch_idx, (x1, x2, x3, y) in enumerate(test_loader):
x1, x2, x3, y = x1.to(device), x2.to(device), x3.to(device), y.to(device)
with torch.no_grad():
y_ = model([x1,x2,x3])
test_loss += F.cross_entropy(y_, y.squeeze())
pred = y_.max(-1, keepdim=True)[1] # .max(): 2输出,分别为最大值和最大值的index
acc += pred.eq(y.view_as(pred)).sum().item() # 记得加item()
test_loss /= len(test_loader)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)'.format(
test_loss, acc, len(test_loader.dataset),
100. * acc / len(test_loader.dataset)))
return acc / len(test_loader.dataset)
best_acc = 0.0
PATH = 'roberta_model.pth' # 定义模型保存路径
for epoch in range(1, NUM_EPOCHS+1): # 3个epoch
train(model, DEVICE, train_loader, optimizer, epoch)
acc = test(model, DEVICE, test_loader)
if best_acc < acc:
best_acc = acc
torch.save(model.state_dict(), PATH) # 保存最优模型
print("acc is: {:.4f}, best acc is {:.4f}\n".format(acc, best_acc))
model.load_state_dict(torch.load(PATH))
acc = test(model, DEVICE, test_loader)

在2080Ti上,train一个epoch差不多三分钟,train一个epoch后,准确率已经有94%以上了。

_, pooled = self.bert(context, token_type_ids=types, attention_mask=mask) 这行代码中有几个需要注意的点:

  • context 形如:[101, …, 102, 0, 0, 0, …, 0]
  • token_type_ids 形如:[0, 0, 0, …, 1, 1, 1, …, 1]
  • attention_mask 形如:[1, 1, 1, …, 0, 0, 0, …, 0]
  • 函数返回的两个结果size分别为[batch_size, max_seq_len, hidden_size=768]和[batch_size, hiddensize=768],前者是最后一层所有的hidden向量,后者是CLS的hidden向量经过一层dense和activation后得到的,所以特别注意:[:, 0, :]和pooled[:, :]是不一样的。这部分源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
class BertPooler(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.activation = nn.Tanh()
def forward(self, hidden_states):
# We "pool" the model by simply taking the hidden state corresponding
# to the first token.
first_token_tensor = hidden_states[:, 0]
pooled_output = self.dense(first_token_tensor)
pooled_output = self.activation(pooled_output)
return pooled_output

References:

donate the author