Vitis-ai(V3.0) YOLOV3的量化

慈云数据 2024-04-11 技术支持 47 0

一、环境配置

Vitis-ai(V3.0) YOLOV3的量化
(图片来源网络,侵删)

 1) 选择Github - ultralytics/yolov3 at v9.6.0

 2)训练自己的数据集、评估数据集、推理数据集

Vitis-ai(V3.0) YOLOV3的量化
(图片来源网络,侵删)

2.配置量化过程

 1)下载Vitis-ai3.0 版本。

 2)将自己YOLOV3推理所需要的代码移动到Vitis-AI-3.0\examples\vai_quantizer\pytorch内

3 配置VSCODE+WSL+Ubuntu(Windows11/10子系统)

    实现VSCODE操纵ubuntu

4.量化准备

1)从vscode 内采用快捷键(Ctrl+~)进入命令界面

2)sudo -i 进入root权限

3)cd 进入Vitis-AI-3.0\examples\vai_quantizer\pytorch内

4)拉取docker-hub:https://hub.docker.com/search?q=vitis-ai

5) 然后conda activate pytorch

5.量化代码:

import argparse

import os

import shutil

import time

import torch

import torch.nn as nn

import torch.optim

import torchvision.datasets as datasets

import torchvision.transforms as transforms

from pytorch_nndct import nn as nndct_nn

from pytorch_nndct.nn.modules import functional

from pytorch_nndct import QatProcessor

parser = argparse.ArgumentParser()

parser.add_argument(

    '--data_dir',

    default='/group/dataset/imagenet/pytorch',

    help='Data set directory.')

parser.add_argument(

    '--pretrained',

    default='/group/modelzoo/torch_models/resnet18-5c106cde.pth',

    help='Pre-trained model file path.')

parser.add_argument(

    '--workers',

    default=4,

    type=int,

    help='Number of data loading workers to be used.')

parser.add_argument('--epochs', default=3, type=int, help='Training epochs.')

parser.add_argument(

    '--quantizer_lr',

    default=1e-2,

    type=float,

    help='Initial learning rate of quantizer.')

parser.add_argument(

    '--quantizer_lr_decay',

    default=0.5,

    type=int,

    help='Learning rate decay ratio of quantizer.')

parser.add_argument(

    '--weight_lr',

    default=1e-5,

    type=float,

    help='Initial learning rate of network weights.')

parser.add_argument(

    '--weight_lr_decay',

    default=0.94,

    type=int,

    help='Learning rate decay ratio of network weights.')

parser.add_argument(

    '--train_batch_size', default=24, type=int, help='Batch size for training.')

parser.add_argument(

    '--val_batch_size',

    default=100,

    type=int,

    help='Batch size for validation.')

parser.add_argument(

    '--weight_decay', default=1e-4, type=float, help='Weight decay.')

parser.add_argument(

    '--display_freq',

    default=100,

    type=int,

    help='Display training metrics every n steps.')

parser.add_argument(

    '--val_freq', default=1000, type=int, help='Validate model every n steps.')

parser.add_argument(

    '--quantizer_norm',

    default=True,

    type=bool,

    help='Use normlization for quantizer.')

parser.add_argument(

    '--mode',

    default='train',

    choices=['train', 'deploy'],

    help='Running mode.')

parser.add_argument(

    '--save_dir',

    default='./qat_models',

    help='Directory to save trained models.')

parser.add_argument(

    '--output_dir', default='qat_result', help='Directory to save qat result.')

args, _ = parser.parse_known_args()

def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):

  """3x3 convolution with padding"""

  return nn.Conv2d(

      in_planes,

      out_planes,

      kernel_size=3,

      stride=stride,

      padding=dilation,

      groups=groups,

      bias=False,

      dilation=dilation)

def conv1x1(in_planes, out_planes, stride=1):

  """1x1 convolution"""

  return nn.Conv2d(

      in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

class BasicBlock(nn.Module):

  expansion = 1

  def __init__(self,

               inplanes,

               planes,

               stride=1,

               downsample=None,

               groups=1,

               base_width=64,

               dilation=1,

               norm_layer=None):

    super(BasicBlock, self).__init__()

    if norm_layer is None:

      norm_layer = nn.BatchNorm2d

    if groups != 1 or base_width != 64:

      raise ValueError('BasicBlock only supports groups=1 and base_width=64')

    if dilation > 1:

      raise NotImplementedError("Dilation > 1 not supported in BasicBlock")

    # Both self.conv1 and self.downsample layers downsample the input when stride != 1

    self.conv1 = conv3x3(inplanes, planes, stride)

    self.bn1 = norm_layer(planes)

    self.relu1 = nn.ReLU(inplace=True)

    self.conv2 = conv3x3(planes, planes)

    self.bn2 = norm_layer(planes)

    self.downsample = downsample

    self.stride = stride

    self.skip_add = functional.Add()

    self.relu2 = nn.ReLU(inplace=True)

  def forward(self, x):

    idEntity = x

    out = self.conv1(x)

    out = self.bn1(out)

    out = self.relu1(out)

    out = self.conv2(out)

    out = self.bn2(out)

    if self.downsample is not None:

      identity = self.downsample(x)

    out = self.skip_add(out, identity)

    out = self.relu2(out)

    return out

class Bottleneck(nn.Module):

  expansion = 4

  def __init__(self,

               inplanes,

               planes,

               stride=1,

               downsample=None,

               groups=1,

               base_width=64,

               dilation=1,

               norm_layer=None):

    super(Bottleneck, self).__init__()

    if norm_layer is None:

      norm_layer = nn.BatchNorm2d

    width = int(planes * (base_width / 64.)) * groups

    # Both self.conv2 and self.downsample layers downsample the input when stride != 1

    self.conv1 = conv1x1(inplanes, width)

    self.bn1 = norm_layer(width)

    self.conv2 = conv3x3(width, width, stride, groups, dilation)

    self.bn2 = norm_layer(width)

    self.conv3 = conv1x1(width, planes * self.expansion)

    self.bn3 = norm_layer(planes * self.expansion)

    self.relu1 = nn.ReLU(inplace=True)

    self.downsample = downsample

    self.stride = stride

    self.skip_add = functional.Add()

    self.relu2 = nn.ReLU(inplace=True)

    self.relu3 = nn.ReLU(inplace=True)

  def forward(self, x):

    identity = x

    out = self.conv1(x)

    out = self.bn1(out)

    out = self.relu1(out)

    out = self.conv2(out)

    out = self.bn2(out)

    out = self.relu2(out)

    out = self.conv3(out)

    out = self.bn3(out)

    if self.downsample is not None:

      identity = self.downsample(x)

    # The original code was:

    # out += identity

    # Replace '+=' with Add module cause we want to quantize add op.

    out = self.skip_add(out, identity)

    out = self.relu3(out)

    return out

class ResNet(nn.Module):

  def __init__(self,

               block,

               layers,

               num_classes=1000,

               zero_init_residual=False,

               groups=1,

               width_per_group=64,

               replace_stride_with_dilation=None,

               norm_layer=None):

    super(ResNet, self).__init__()

    if norm_layer is None:

      norm_layer = nn.BatchNorm2d

    self._norm_layer = norm_layer

    self.inplanes = 64

    self.dilation = 1

    if replace_stride_with_dilation is None:

      # each element in the tuple indicates if we should replace

      # the 2x2 stride with a dilated convolution instead

      replace_stride_with_dilation = [False, False, False]

    if len(replace_stride_with_dilation) != 3:

      raise ValueError(

          "replace_stride_with_dilation should be None "

          "or a 3-element tuple, got {}".format(replace_stride_with_dilation))

    self.groups = groups

    self.base_width = width_per_group

    self.conv1 = nn.Conv2d(

        3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)

    self.bn1 = norm_layer(self.inplanes)

    self.relu = nn.ReLU(inplace=True)

    self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    self.layer1 = self._make_layer(block, 64, layers[0])

    self.layer2 = self._make_layer(

        block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])

    self.layer3 = self._make_layer(

        block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])

    self.layer4 = self._make_layer(

        block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])

    self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

    self.fc = nn.Linear(512 * block.expansion, num_classes)

    self.quant_stub = nndct_nn.QuantStub()

    self.dequant_stub = nndct_nn.DeQuantStub()

    for m in self.modules():

      if isinstance(m, nn.Conv2d):

        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

      elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):

        nn.init.constant_(m.weight, 1)

        nn.init.constant_(m.bias, 0)

    # Zero-initialize the last BN in each residual branch,

    # so that the residual branch starts with zeros, and each residual block behaves like an identity.

    # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677

    if zero_init_residual:

      for m in self.modules():

        if isinstance(m, Bottleneck):

          nn.init.constant_(m.bn3.weight, 0)

        elif isinstance(m, BasicBlock):

          nn.init.constant_(m.bn2.weight, 0)

  def _make_layer(self, block, planes, blocks, stride=1, dilate=False):

    norm_layer = self._norm_layer

    downsample = None

    previous_dilation = self.dilation

    if dilate:

      self.dilation *= stride

      stride = 1

    if stride != 1 or self.inplanes != planes * block.expansion:

      downsample = nn.Sequential(

          conv1x1(self.inplanes, planes * block.expansion, stride),

          norm_layer(planes * block.expansion),

      )

    layers = []

    layers.append(

        block(self.inplanes, planes, stride, downsample, self.groups,

              self.base_width, previous_dilation, norm_layer))

    self.inplanes = planes * block.expansion

    for _ in range(1, blocks):

      layers.append(

          block(

              self.inplanes,

              planes,

              groups=self.groups,

              base_width=self.base_width,

              dilation=self.dilation,

              norm_layer=norm_layer))

    return nn.Sequential(*layers)

  def forward(self, x):

    x = self.quant_stub(x)

    x = self.conv1(x)

    x = self.bn1(x)

    x = self.relu(x)

    x = self.maxpool(x)

    x = self.layer1(x)

    x = self.layer2(x)

    x = self.layer3(x)

    x = self.layer4(x)

    x = self.avgpool(x)

    x = torch.flatten(x, 1)

    x = self.fc(x)

    x = self.dequant_stub(x)

    return x

def _resnet(arch, block, layers, pretrained, progress, **kwargs):

  model = ResNet(block, layers, **kwargs)

  if pretrained:

    model.load_state_dict(torch.load(args.pretrained))

  return model

def resnet18(pretrained=False, progress=True, **kwargs):

  r"""ResNet-18 model from

    `"Deep Residual Learning for Image Recognition" '_

    Args:

        pretrained (bool): If True, returns a model pre-trained on ImageNet

        progress (bool): If True, displays a progress bar of the download to stderr

    """

  return _resnet('resnet18', BasicBlock, [2, 2, 2, 2], pretrained, progress,

                 **kwargs)

def train_one_step(model, inputs, criterion, optimizer, step, gpu=None):

  # switch to train mode

  model.train()

  images, target = inputs

  if gpu is not None:

    model = model.cuda(gpu)

    images = images.cuda(gpu, non_blocking=True)

    target = target.cuda(gpu, non_blocking=True)

  # compute output

  output = model(images)

  loss = criterion(output, target)

  l2_decay = 1e-4

  l2_norm = 0.0

  for param in model.quantizer_parameters():

    l2_norm += torch.pow(param, 2.0)[0]

  if args.quantizer_norm:

    loss += l2_decay * torch.sqrt(l2_norm)

  # measure accuracy and record loss

  acc1, acc5 = accuracy(output, target, topk=(1, 5))

  # compute gradient and do SGD step

  optimizer.zero_grad()

  loss.backward()

  optimizer.step()

  return loss, acc1, acc5

def validate(val_loader, model, criterion, gpu):

  batch_time = AverageMeter('Time', ':6.3f')

  losses = AverageMeter('Loss', ':.4e')

  top1 = AverageMeter('Acc@1', ':6.2f')

  top5 = AverageMeter('Acc@5', ':6.2f')

  progress = ProgressMeter(

      len(val_loader), [batch_time, losses, top1, top5], prefix='Test: ')

  # switch to evaluate mode

  model.eval()

  with torch.no_grad():

    end = time.time()

    for i, (images, target) in enumerate(val_loader):

      if gpu is not None:

        model = model.cuda(gpu)

        images = images.cuda(gpu, non_blocking=True)

        target = target.cuda(gpu, non_blocking=True)

      # compute output

      output = model(images)

      loss = criterion(output, target)

      # measure accuracy and record loss

      acc1, acc5 = accuracy(output, target, topk=(1, 5))

      losses.update(loss.item(), images.size(0))

      top1.update(acc1[0], images.size(0))

      top5.update(acc5[0], images.size(0))

      # measure elapsed time

      batch_time.update(time.time() - end)

      end = time.time()

      if i % 50 == 0:

        progress.display(i)

    # TODO: this should also be done with the ProgressMeter

    print(' * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}'.format(

        top1=top1, top5=top5))

  return top1.avg

def mkdir_if_not_exist(x):

  if not x or os.path.isdir(x):

    return

  os.mkdir(x)

  if not os.path.isdir(x):

    raise RuntimeError("Failed to create dir %r" % x)

def save_checkpoint(state, is_best, directory):

  mkdir_if_not_exist(directory)

  filepath = os.path.join(directory, 'model.pth')

  torch.save(state, filepath)

  if is_best:

    best_acc1 = state['best_acc1'].item()

    best_filepath = os.path.join(directory, 'model_best_%5.3f.pth' % best_acc1)

    shutil.copyfile(filepath, best_filepath)

    print('Saving best ckpt to {}, acc1: {}'.format(best_filepath, best_acc1))

class AverageMeter(object):

  """Computes and stores the average and current value"""

  def __init__(self, name, fmt=':f'):

    self.name = name

    self.fmt = fmt

    self.reset()

  def reset(self):

    self.val = 0

    self.avg = 0

    self.sum = 0

    self.count = 0

  def update(self, val, n=1):

    self.val = val

    self.sum += val * n

    self.count += n

    self.avg = self.sum / self.count

  def __str__(self):

    fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'

    return fmtstr.format(**self.__dict__)

class ProgressMeter(object):

  def __init__(self, num_batches, meters, prefix=""):

    self.batch_fmtstr = self._get_batch_fmtstr(num_batches)

    self.meters = meters

    self.prefix = prefix

  def display(self, batch):

    entries = [self.prefix + self.batch_fmtstr.format(batch)]

    entries += [str(meter) for meter in self.meters]

    print('\t'.join(entries))

  def _get_batch_fmtstr(self, num_batches):

    num_digits = len(str(num_batches // 1))

    fmt = '{:' + str(num_digits) + 'd}'

    return '[' + fmt + '/' + fmt.format(num_batches) + ']'

def adjust_learning_rate(optimizer, epoch, step):

  """Sets the learning rate to the initial LR decayed by decay ratios"""

  weight_lr_decay_steps = 3000 * (24 / args.train_batch_size)

  quantizer_lr_decay_steps = 1000 * (24 / args.train_batch_size)

  for param_group in optimizer.param_groups:

    group_name = param_group['name']

    if group_name == 'weight' and step % weight_lr_decay_steps == 0:

      lr = args.weight_lr * (

          args.weight_lr_decay**(step / weight_lr_decay_steps))

      param_group['lr'] = lr

      print('Adjust lr at epoch {}, step {}: group_name={}, lr={}'.format(

          epoch, step, group_name, lr))

    if group_name == 'quantizer' and step % quantizer_lr_decay_steps == 0:

      lr = args.quantizer_lr * (

          args.quantizer_lr_decay**(step / quantizer_lr_decay_steps))

      param_group['lr'] = lr

      print('Adjust lr at epoch {}, step {}: group_name={}, lr={}'.format(

          epoch, step, group_name, lr))

def accuracy(output, target, topk=(1,)):

  """Computes the accuracy over the k top predictions for the specified values of k"""

  with torch.no_grad():

    maxk = max(topk)

    batch_size = target.size(0)

    _, pred = output.topk(maxk, 1, True, True)

    pred = pred.t()

    correct = pred.eq(target.view(1, -1).expand_as(pred))

    res = []

    for k in topk:

      correct_k = correct[:k].flatten().float().sum(0, keepdim=True)

      res.append(correct_k.mul_(100.0 / batch_size))

    return res

def train(model, train_loader, val_loader, criterion, gpu):

  best_acc1 = 0

  num_train_batches_per_epoch = int(len(train_loader) / args.train_batch_size)

  batch_time = AverageMeter('Time', ':6.3f')

  data_time = AverageMeter('Data', ':6.3f')

  losses = AverageMeter('Loss', ':.4e')

  top1 = AverageMeter('Acc@1', ':6.2f')

  top5 = AverageMeter('Acc@5', ':6.2f')

  param_groups = [{

      'params': model.quantizer_parameters(),

      'lr': args.quantizer_lr,

      'name': 'quantizer'

  }, {

      'params': model.non_quantizer_parameters(),

      'lr': args.weight_lr,

      'name': 'weight'

  }]

  optimizer = torch.optim.Adam(

      param_groups, args.weight_lr, weight_decay=args.weight_decay)

  for epoch in range(args.epochs):

    progress = ProgressMeter(

        len(train_loader) * args.epochs,

        [batch_time, data_time, losses, top1, top5],

        prefix="Epoch[{}], Step: ".format(epoch))

    for i, (images, target) in enumerate(train_loader):

      end = time.time()

      # measure data loading time

      data_time.update(time.time() - end)

      step = len(train_loader) * epoch + i

      adjust_learning_rate(optimizer, epoch, step)

      loss, acc1, acc5 = train_one_step(model, (images, target), criterion,

                                        optimizer, step, gpu)

      # measure elapsed time

      batch_time.update(time.time() - end)

      end = time.time()

      losses.update(loss.item(), images.size(0))

      top1.update(acc1[0], images.size(0))

      top5.update(acc5[0], images.size(0))

      if step % args.display_freq == 0:

        progress.display(step)

      if step % args.val_freq == 0:

        # evaluate on validation set

        acc1 = validate(val_loader, model, criterion, gpu)

        # remember best acc@1 and save checkpoint

        is_best = acc1 > best_acc1

        best_acc1 = max(acc1, best_acc1)

        save_checkpoint(

            {

                'epoch': epoch + 1,

                'state_dict': model.state_dict(),

                'best_acc1': best_acc1

            }, is_best, args.save_dir)

def main():

  print('Used arguments:', args)

  traindir = os.path.join(args.data_dir, 'train')

  valdir = os.path.join(args.data_dir, 'validation')

  normalize = transforms.Normalize(

      mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

  train_dataset = datasets.ImageFolder(

      traindir,

      transforms.Compose([

          transforms.RandomResizedCrop(224),

          transforms.RandomHorizontalFlip(),

          transforms.ToTensor(),

          normalize,

      ]))

  train_loader = torch.utils.data.DataLoader(

      train_dataset,

      batch_size=args.train_batch_size,

      shuffle=True,

      num_workers=args.workers,

      pin_memory=True)

  val_dataset = datasets.ImageFolder(

      valdir,

      transforms.Compose([

          transforms.Resize(256),

          transforms.CenterCrop(224),

          transforms.ToTensor(),

          normalize,

      ]))

  val_loader = torch.utils.data.DataLoader(

      val_dataset,

      batch_size=args.val_batch_size,

      shuffle=False,

      num_workers=args.workers,

      pin_memory=True)

  model = resnet18(pretrained=True)

  # define loss function (criterion) and optimizer

  criterion = nn.CrossEntropyLoss()

  gpu = 0

  inputs = torch.randn([args.train_batch_size, 3, 224, 224],

                       dtype=torch.float32).cuda(gpu)

  qat_processor = QatProcessor(

      model, inputs, bitwidth=8, device=torch.device('cuda:{}'.format(gpu)))

  if args.mode == 'train':

    # Step 1: Get quantized model and train it.

    quantized_model = qat_processor.trainable_model()

    criterion = criterion.cuda(gpu)

    train(quantized_model, train_loader, val_loader, criterion, gpu)

    # Step 2: Get deployable model and test it.

    # There may be some slight differences in accuracy with the quantized model.

    deployable_model = qat_processor.to_deployable(quantized_model,

                                                   args.output_dir)

    validate(val_loader, deployable_model, criterion, gpu)

  elif args.mode == 'deploy':

    # Step 3: Export xmodel from deployable model.

    deployable_model = qat_processor.deployable_model(

        args.output_dir, used_for_xmodel=True)

    val_subset = torch.utils.data.Subset(val_dataset, list(range(1)))

    subset_loader = torch.utils.data.DataLoader(

        val_subset,

        batch_size=1,

        shuffle=False,

        num_workers=args.workers,

        pin_memory=True)

    # Must forward deployable model at least 1 iteration with batch_size=1

    for images, _ in subset_loader:

      deployable_model(images)

    qat_processor.export_xmodel(args.output_dir)

  else:

    raise ValueError('mode must be one of ["train", "deploy"]')

if __name__ == '__main__':

  main()

参照resnet18.py直接调用模型,首先执行

微信扫一扫加客服

微信扫一扫加客服

点击启动AI问答
Draggable Icon