因为K折交叉验证执行一次训练的总轮数是每一折的训练轮数(epochs)与总折数(K)的乘积,因此训练的成本会翻倍。
2.4 K折交叉验证的代码
import torch
import random
from torch.utils.data import DataLoader, TensorDataset
from Model.ReconsModel.Recoder import ReconsModel, Loss_function
from Model.ModelConfig import ModelConfig
# 返回第 i+1 折(i取 0 ~ k-1)的训练集(train)与验证集(valid)
def get_Kfold_data(k, i, x): # k是折数,取第i+1折,x是特征数据
fold_size = x.size(0) // k # 计算每一折中的数据数量
val_start = i * fold_size # 第 i+1折 数据的测试集初始数据编号
if i != k - 1: # 不是最后一折的话,数据的分配策略
val_end = (i + 1) * fold_size # 验证集的结束
valid_data = x[val_start: val_end]
train_data = torch.cat((x[0: val_start], x[val_end:]), dim=0)
else: # 如果是最后一折,数据的分配策略,主要涉及到不能K整除时,多出的数据如何处理
valid_data = x[val_start:] # 实际上,多出来的样本,都放在最后一折里了
train_data = x[0: val_start]
return train_data, valid_data
# k折交叉验证,某一折的训练
def train(model, train_data, valid_data, batch_size, lr,epochs):
# 数据准备
train_loader = DataLoader(TensorDataset(train_data), batch_size, shuffle=True)
valid_loader = DataLoader(TensorDataset(valid_data), batch_size, shuffle=True)
# 损失函数,优化函数的准备
criterion = Loss_function()
optimizer = torch.optim.Adam(params=model.parameters(), lr=lr)
# 记录每一个epoch的平均损失
train_loss = []
valid_loss = []
for epoch in range(epochs):
tra_loss = 0
val_loss = 0
for i , data in enumerate(train_loader):
# 假设数据的处理 此时的data是list类型的数据,转化成Tensor,并且把多出来的第0维去掉
data = torch.stack(data)
data = data.squeeze(0)
optimizer.zero_grad() # 梯度清零
recon, mu, log_std = model(data, if_train=True) # if_train不能少
# 计算损失
loss = criterion.loss_function(recon, data, mu, log_std)
# 反向传播
loss.backward()
optimizer.step()
tra_loss = tra_loss + loss.item()
tra_loss = tra_loss / len(train_data)
train_loss.append(tra_loss)
# 计算测试集损失
with torch.no_grad():
for i, data in enumerate(valid_loader):
# 假设数据的处理 此时的data是list类型的数据,转化成Tensor,并且把多出来的第0维去掉
data = torch.stack(data)
data = data.squeeze(0)
optimizer.zero_grad()
recon, mu, log_std = model(data, if_train=False)
test_loss = criterion.loss_function(recon, data, mu, log_std).item()
val_loss = val_loss + test_loss
val_loss = val_loss / len(valid_data)
valid_loss.append(val_loss)
print('第 %d 轮, 训练的平均误差为%.3f, 测试的平均误差为%.3f 。'%(epoch+1, tra_loss, val_loss))
return train_loss, valid_loss
# k折交叉验证
def k_test(config, datas): # k是总折数,
valid_loss_sum = 0
for i in range(config.k):
model = ReconsModel(config) # 细节,每一折,并不是在上一折训练好的模型基础上继续训练,而是重新训练
print('-'*25,'第',i+1,'折','-'*25)
train_data , valid_data = get_Kfold_data(config.k, i, datas) # 获取某一折的训练数据、测试数据
train_loss, valid_loss = train(model, train_data, valid_data, config.batch_size, config.lr, config.epochs)
# 求某一折的平均损失
train_loss_ave = sum(train_loss)/len(train_loss)
valid_loss_ave = sum(valid_loss)/len(valid_loss)
print('-*-*-*- 第 %d 折, 平均训练损失%.3f,平均检验损失%.3f -*-*-*-'%(i+1, train_loss_ave,valid_loss_ave))
valid_loss_sum = valid_loss_sum + valid_loss_ave
valid_loss_k_ave = valid_loss_sum / config.k # 基于K折交叉验证的验证损失
print('*' * 60, )
print('基于K折交叉验证的验证损失为%.4f'%valid_loss_k_ave)
if __name__ == "__main__":
# 创建数据集,或者说数据集只要是这样的形式即可
X = torch.rand(5000, 16, 38) # 5000条数据,,每条有16个时间步,每步38个特征,时序数据
# 随机打乱
index = [i for i in range(len(X))]
random.shuffle(index)
X = X[index] # 要是有标签的话,index要对得上
config = ModelConfig()
config.load('./Model/config.json')
k_test(config, X)