从代码入手 光速入门深度学习

常规深度学习流程图

常规的、多数情况下的深度学习的流程框架可以被抽象为上方这个流程图。

首先,我们准备输入数据。输入数据一般被处理为成向量,然后输入模型,输出预测。预测的结果与真实标签通过损失函数计算得出损失,最后损失反向传播,更新模型参数。

下面我们从一个示例代码开始,快速带同学们入门深度学习。

import

import config
import random, shutil, os
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import os

这是一个示例代码的import部分。这些包可能有些同学不认识我们大概来过一下:

  • matplotlib 是可视化的库,具体怎么用我们后面可能会提到
  • torch 是深度学习的框架。大家可能已经听说过了
  • torchvision 是针对于计算机视觉这一个task的库。我们后面会介绍
  • numpy pandas应该不用我介绍了,应该没人没用过吧
  • sklearn是机器学习库。需要注意的是,下载时需要使用 pip install scikit-learn ,直接使用sklearn下不下来。

设置超参数

接下来我们通常会设置超参数。超参数的设置方式也有比较多种,第一种是使用大写的变量,如:

SEED = 42
LR = 1e-4
BATCH_SIZE = 32
self.EPOCHS = 10
self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.MODEL_SAVE_PATH = 'best_model.pth'
self.DATA_DIR = './data'

也有人会喜欢这么写:

class Config:
    def __init__(self):
        self.SEED = 42
        self.LR = 1e-4
        self.BATCH_SIZE = 32
        self.EPOCHS = 10
        self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.MODEL_SAVE_PATH = 'best_model.pth'
        self.DATA_DIR = './data'    # 数据集路径
 
config = Config()

一般使用 argparse 这种方式更加专业:

import argparse
 
# 设置argparse参数解析
parser = argparse.ArgumentParser(description='深度学习训练脚本')
parser.add_argument('--seed', type=int, default=42, help='随机种子')
parser.add_argument('--lr', type=float, default=1e-4, help='学习率')
parser.add_argument('--batch_size', type=int, default=32, help='批量大小')
parser.add_argument('--epochs', type=int, default=10, help='训练轮次')
parser.add_argument('--device', type=str, default='auto', help='设备选择(auto/cuda/cpu)')
parser.add_argument('--model_save_path', type=str, default='best_model.pth', help='模型保存路径')
parser.add_argument('--data_dir', type=str, default='./data', help='数据目录')
 
args = parser.parse_args()
 
if args.device == 'auto':
    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
else:
    DEVICE = torch.device(args.device)
 
# 调用时使用类似:
torch.manual_seed(args.seed)

需要用bash脚本使用和控制版本等。

也有一些方法,写外部的config文件等。管理方法很多,使用你习惯、用得顺手的即可。

设置随机种子

一般来说,在import的下一个步骤,我们会设置随机种子保证结果可复现。例如:

# 设置随机种子确保结果可复现
def set_seed(seed=42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
 
set_seed(args.seed)

数据预处理

在数据预处理部分,我们需要将数据清洗、整理好(如果需要),分割训练集、验证集、测试集,并打包为dataset类。

训练集、验证集、测试集分割

假设你拿到的是一个完整的数据集:

train_indices, test_indices = train_test_split(
    indices,
    test_size=0.2,
    random_state=42,
    stratify=full_dataset.targets.numpy()
)
train_indices, val_indices = train_test_split(
    train_indices, 
    test_size=0.25, 
    random_state=42,
    stratify=[full_dataset.targets[i] for i in train_indices]  
)

可以选择使用索引分割的方式。索引分割对内存较为友好。当然,也可以直接加载所有数据分割数据(如果数据规模不大),但速度会变慢。你可以针对路径/索引等先进行分割。

初学者可能会经常分不清训练集、验证集和测试集的概念。这在机器学习中是极其重要的概念,最好不要搞混。

训练集(Train Set):

用于模型训练。它们会通过模型输入,输出预测标签,和真实标签计算损失,然后损失被反向传播。训练集是被模型拟合的对象。

验证集(Valid Set):

用于超参数调优、模型选择。通常来说,在训练的时候,我们会在每个epoch结束后,或者在多少个step结束后,使用验证集进行一轮验证。在这个过程中不会计算梯度,但会和真实标签计算一些指标,如准确率和Loss。通常来说,我们通过观察这个指标来确定模型训练的情况,比如其是否发生了过拟合(反映在指标上是训练Loss很小/准确率很高但验证集上Loss较高/准确率不高)。

一种常见的模型选择策略是选择在验证集上分数最高的模型。

我们还可以基于验证集的分数/Loss来确定早停策略。比如,当验证集的 Loss 在5个 epoch 上没有改善的时候,我们就会提前停止训练节省时间。

测试集(Test Set):

测试集是测试模型最终能力的“考试”。比如说,我们会拿着验证集上表现最好的模型在测试集上面跑。测试集应当真实地考验模型的泛化能力,理论上,你不可以照着测试集的指标去调参等。这种操作叫做“人工过拟合测试集”,很可能导致模型能力被错误地反映。

最好情况下,在数据充足的情境下,一个测试集最好只被使用一次。当然,在大部分情况下,这种方式不太可行。太奢侈了(

交叉验证(Cross Validation)

当数据量有限时,简单的训练/验证集分割可能导致评估结果不稳定。此时可采用交叉验证:将训练数据划分为K个子集(称为”折”),每次用K-1折训练,剩余1折验证,循环K次后取平均结果:

from sklearn.model_selection import KFold
 
kf = KFold(n_splits=5, shuffle=True, random_state=42)
for train_idx, val_idx in kf.split(dataset):
    # 每个fold的训练和验证逻辑

交叉验证主要用于:

  1. 模型性能评估(比单次分割更可靠)
  2. 超参数调优(结合网格搜索/随机搜索)
  3. 小样本数据下的模型选择

注意:交叉验证仅应用于训练阶段,最终模型仍需在独立的测试集上进行最终评估。相当于分割的时候,我们先将测试集分割出去,然后将训练集和验证集按照Cross Validation的方式进行划分。

缺点:训练时间 * k ,耗时很长。

PyTorch Dataset与DataLoader构建

Dataset

class CustomDataset(Dataset):
    def __init__(self, data, targets, transform=None):
        self.data = data
        self.targets = targets
        self.transform = transform
 
    def __len__(self):
        return len(self.data)
 
    def __getitem__(self, idx):
        sample = self.data[idx]
        label = self.targets[idx]
        
        if self.transform:
            sample = self.transform(sample)
            
        return sample, label

你需要根据自己的数据格式,自行实现 Dataset 这个 class。大部分情况下,这个东西你都需要自己根据数据格式去自定义。

下面详细讲讲Class里面的每一个方法

Init 方法

    def __init__(self, data_dir, transform=None):
        """
        初始化数据集
        :param data_dir: 数据存储目录
        :param transform: 数据预处理/增强操作
        """
        self.data_dir = data_dir
        self.transform = transform
        
        # 加载数据列表(示例)
        self.samples = self._load_data()

首先是Init 方法。在Init 方法中,我们需要定义 Class 中的一些基本对象。比如,我们可以在这里定义你数据存放的路径,当然,部分任务比如图像任务中可能需要输入一个 transform 也就是数据增强或者变换的方法。

在这里,不建议真正地加载所有数据。在很多任务中,数据很大,一次性加载会占用内存,并且会导致项目启动很慢。

len 方法

接着,是 len 方法。需要正确地返回样本的长度。如果你使用 Dataset,这个方法必须实现

    def __len__(self):
        """返回数据集总样本数"""
        return len(self.samples)

在一些特殊情况下,我们的数据可能是流式的,或不方便获取长度的。此时在函数定义时,我们就不应使用Dataset,而应该使用 IterableDataset 例如:

from torch.utils.data import IterableDataset
 
class StreamDataset(IterableDataset):
    def __init__(self, data_generator):
        self.data_generator = data_generator  # 数据生成器/迭代器
 
    def __iter__(self):
        return self.data_generator()

getitem

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        img_path = os.path.join(self.root_dir, img_name)
        image = Image.open(img_path).convert('RGB')
        label = self.class_to_idx[img_name.split('_')[0]]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

在这个方法中,你需要实现实际加载数据的方法。它应当加载你数据中的一个样本。

实例化dataset

train_dataset = CustomDataset(
    filepaths=[all_filepaths[i] for i in train_indices],
    labels=[all_labels[i] for i in train_indices],
    transform=train_transform
)
 
val_dataset = CustomDataset(
    filepaths=[all_filepaths[i] for i in val_indices],
    labels=[all_labels[i] for i in val_indices],
    transform=val_transform
)
 
test_dataset = CustomDataset(
    filepaths=[all_filepaths[i] for i in test_indices],
    labels=[all_labels[i] for i in test_indices],
    transform=test_transform
)

在图像任务中,通常在 train_transform 中我们会进行数据增强,但是在 val 和 test中,我们在 transform中可能只做标准化。

DataLoader 方法

from torch.utils.data import DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

DataLoader 能够将dataset 进行打包。一般来说,我们会在数据上加上一个 batchsize 的维度。在DataLoader方法中,它还能够承担 shuffle 洗混数据的目的。

(疑难杂症:如果你写的代码中在 dataset 中使用 num_workers 了,并且在jupyter notebook 中使用,你可能会发现训练时进度条不会动。请删除 num_workers 这个参数,或不要使用jupyter notebook)

模型构建

在PyTorch中,我们通常通过继承nn.Module类来构建模型:

class SimpleCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 8 * 8, 512)
        self.fc2 = nn.Linear(512, num_classes)
        self.dropout = nn.Dropout(0.5)
 
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 8 * 8)  # 展平处理
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

example 模型关键组件说明:

  1. 卷积层 (Conv2d):
    • 提取图像局部特征
    • 参数:输入通道数、输出通道数、卷积核大小、padding等
  2. 池化层 (MaxPool2d):
    • 降低空间维度(下采样)
    • 增强平移不变性
  3. 全连接层 (Linear):
    • 将提取的特征映射到类别空间
    • 输入维度需要根据前层输出计算
  4. Dropout层
    • 随机失活神经元防止过拟合
    • 训练时生效,测试时自动关闭
  5. 激活函数 (ReLU):
    • 引入非线性
    • 解决梯度消失问题

其余技巧或写法

残差

    def forward(self, x):
        identity = x
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        x += identity  # 残差连接
        return F.relu(x)

批量归一化

class SimpleCNN(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1),
            nn.BatchNorm2d(32),  # 添加BN层
            nn.ReLU()
        )

配置化模型构建

实验/参数调优时使用

def create_cnn(config):
    layers = []
    in_channels = 3
    for out_channels in config['channels']:
        layers += [
            nn.Conv2d(in_channels, out_channels, **config['conv_params']),
            nn.ReLU(),
            nn.MaxPool2d(**config['pool_params'])
        ]
        in_channels = out_channels
    return nn.Sequential(*layers)
 
config = {
    'channels': [32, 64],
    'conv_params': {'kernel_size': 3, 'padding': 1},
    'pool_params': {'kernel_size': 2, 'stride': 2}
}

自定义层实现

class AttentionLayer(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.fc = nn.Linear(channels, 1)
        
    def forward(self, x):
        weights = F.softmax(self.fc(x.view(x.size(0), x.size(1), -1)), dim=2)
        return (x.view(x.size(0), x.size(1), -1) * weights).sum(dim=2)

模型实例化

model = SimpleCNN(num_classes=10).to(DEVICE)

DEVICE 由之前的参数定义,可以为 cpu/cuda。

如果在多卡环境,可以指定显卡ID,如 DEVICE = "cuda:0"

自动多卡并行:

model = DataParallel(model)

请了解清楚后使用,否则可能会导致反效果。

损失函数

损失函数有部分常用的,可以直接调用。例如:

criterion = nn.CrossEntropyLoss()

如果有特殊需求,你也可以自行定义。示例为在类别不平衡情况下使用的 FocalLoss

class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2.0):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
 
    def forward(self, inputs, targets):
        # 计算交叉熵损失
        BCE_loss = F.cross_entropy(inputs, targets, reduction='none')
        # 计算概率
        pt = torch.exp(-BCE_loss)
        # Focal Loss公式:FL = α(1-pt)^γ * CE
        loss = self.alpha * (1-pt)**self.gamma * BCE_loss
        return loss.mean()
 

请使用torch张量操作,确保所有操作在计算图中可导。

优化器

1. 随机梯度下降(SGD with Momentum)

原理

SGD的核心是通过负梯度方向更新参数,但引入动量(Momentum)加速收敛:

  • 参数更新公式
  • v_t:累积的动量(历史梯度的指数加权平均)
  • \beta(如0.9):动量系数,控制历史梯度的贡献
  • \eta:学习率
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

2. Adam(Adaptive Moment Estimation)

原理

Adam结合了动量(一阶矩)和自适应学习率(二阶矩):

作用

  • 自适应学习率:参数更新步长根据梯度历史动态调整(梯度大的方向步长小,反之亦然)。
  • 快速收敛:一阶矩加速方向调整,二阶矩自适应步长。

问题

  • 泛化能力:可能收敛到尖锐极小值,导致测试性能下降。
  • 偏差校正:初始阶段 偏小,需校正()。

3. AdamW

原理

AdamW改进了Adam的权重衰减(L2正则化)方式,解耦权重衰减与梯度更新:

作用

  • 解耦正则化:权重衰减直接作用于参数,而非梯度(Adam的L2正则化会与自适应学习率耦合)。
  • 提升泛化:尤其适合Transformer等复杂模型,减少过拟合。

问题

  • 学习率预热:初始阶段需配合预热策略(warmup),避免梯度不稳定。

对比:

优化器收敛速度内存消耗超参数敏感度
SGD
Adam
AdamW

为什么 Adam 会消耗更多的显存?

  1. 状态变量存储
    • Adam 维护两个状态变量:
      • :梯度的一阶矩(均值)。
      • :梯度的二阶矩(未中心化的方差)。
    • AdamW 同样维护 mt​ 和 vt​,但对权重衰减的处理方式不同,不增加额外状态变量。
  2. 参数规模的影响
    • 假设模型有 N 个参数,每个参数需要存储:
      • 当前值(param)。
      • 梯度(param.grad)。
      • 一阶矩(m)。
      • 二阶矩(v)。
    • 总计显存占用 :每个参数需要存储 4 个浮点数(SGD 只需存储 paramparam.grad)。

学习率调度器

学习率调度器通过动态调整优化器的学习率,帮助模型在训练过程中更高效地收敛,避免陷入局部最优或震荡。

这也是个玄学东西,多种多样。 比较流行的一个:CosineAnnealingLR

CosineAnnealingLR

下图的这种叫Initial_Warmup_Cosine_Annealing_With_Weight_Decay

这里就随便介绍一种,种类太多了 大家感兴趣自己搜吧

示例代码(StepLR的):

scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)

调度器需要与 优化器绑定。 即: 优化器和模型绑定 调度器和优化器绑定

训练过程

其实写法也多种多样。这里放一种比较常见的的。

训练部分

def train(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for inputs, labels in tqdm(dataloader, desc="Training"):
        inputs, labels = inputs.to(device), labels.to(device) 
        
        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # 反向传播与优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # 统计损失和准确率
        running_loss += loss.item() * inputs.size(0)
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
    
    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = correct / total
    return epoch_loss, epoch_acc

将数据和模型放到同一个设备上

inputs, labels = inputs.to(device), labels.to(device) 

前向传播

        outputs = model(inputs)
        loss = criterion(outputs, labels)

反向传播

第一步需要清空调度器的梯度。pytorch的默认行为是让优化器的梯度累计。

optimizer.zero_grad()

第二步是计算梯度,pytorch启动自动求导步骤

loss.backward()

第三步是 optimizer.step() 使用根据上一步算出的梯度,更新模型参数

optimizer.step()

第四步是更新学习率。它在train函数之外实现,或者在模型参数更新之后的任意位置更新就可以。如果在每个epoch之内调用,就是每个step更新一次;如果在每个epoch外更新,就是每个epoch更新一次。

scheduler.step()

计算训练准确率等

        # 统计损失和准确率
        running_loss += loss.item() * inputs.size(0)
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
    
    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = correct / total
    return epoch_loss, epoch_acc

验证部分

验证部分可以让我们实时地在训练过程中知道模型性能,以方便进行参数调优等。 例如你发现训练准确率和验证准确率差距过大,那么很可能模型就发生了过拟合。

def validate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in tqdm(dataloader, desc="Validating"):
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    
    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = correct / total
    return epoch_loss, epoch_acc

在测试的部分,我们首先要开启 model.eval()

这告诉pytorch,在这个函数中下面的部分不需要计算梯度。

quiz: 假设我们使用了 Adam优化器,这里我们模型所需要使用的参数量是训练时候的多少分之一?

由于使用的内存减少了,在validation和test的时候,我们batch size也可以开大一些。

测试代码比较简单,只需要前向传播就可以了。

训练循环

# 初始化最佳模型参数
best_val_loss = float('inf')
patience = 5  # 早停耐心值
counter = 0
 
# 训练历史记录
history = {
    'train_loss': [],
    'train_acc': [],
    'val_loss': [],
    'val_acc': []
}
 
for epoch in range(args.epochs):
    print(f"\nEpoch {epoch+1}/{args.epochs}")
    
    # 训练阶段
    train_loss, train_acc = train(
        model, 
        train_loader, 
        criterion, 
        optimizer, 
        DEVICE
    )
    
    # 验证阶段
    val_loss, val_acc = validate(
        model, 
        val_loader, 
        criterion, 
        DEVICE
    )
    
    # 记录训练历史
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    
    # 早停机制
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        counter = 0
        torch.save(model.state_dict(), args.model_save_path)
        print("Model saved.")
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered.")
            break
            
    # 打印当前状态
    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

这里设置了早停代码。

测试、验证和可视化

自己做吧!

记录保存

保存实验记录十分重要! python提供了多种保存记录的方式,比如内置的logger

这里不教具体实现了,但大家做实验的时候一定要详细地保存。你可以保存的东西:

  • 训练参数
  • 训练结果
  • 模型的架构、实现
  • 使用的数据集
  • 实验时间

最好详细地保存所有过程中的记录。如果机器上的空间允许,也最好多多保存模型。