海康工业相机触发模式抓图和AI目标检测

慈云数据 5个月前 (06-15) 技术支持 53 0

文章目录

  • 简介
  • hikrobot工业相机抓图
    • 安装驱动
    • 相机抓图
    • 硬件触发
    • 硬件触发接线图
    • AI目标检测接口
      • 对yolov5的封装
      • flask对http接口的实现

        简介

        本文实现用海康工业相机,触发模式,开发调试用软件触发,实际用硬件触发抓图,调用目标检测接口

        hikrobot工业相机抓图

        安装驱动

        去官网下载驱动,下载客户端,而不要选运行环境,客户端包含了运行环境,还有带示例,文档,里面有个安装文件大概如 MVS-2.1.2_x86_64_20231011.deb

        # 安装
        sudo dpkg -i MVS-2.1.2_x86_64_20231011.deb
        # 删除用以下命令
        sudo dpkg -r mvs
        # 打开客户端
        cd /opt/MVS/bin/
        ./MVS
        

        文档在 /opt/MVS/doc/

        示例在 /opt/MVS/Samples

        python示例在 /opt/MVS/Samples/64/Python/

        相机抓图

        代码如下,使用2个相机,开起线程模拟循环调用软件触发。

        sdk有callback传参数失效的坑,感觉是海康sdk的bug,现象是这样的,

        使用MV_CC_RegisterImageCallBackEx(call_back_fun,user_data)

        传给回调函数的参数user_data,user_data是个指针,运行开始传参是正确的,运行一段时间后传的地址正确,但是指针指向的数据内容错了,应该是被sdk修改了,所以起了2个回调函数,根据用户自定义的名称DeviceUserID区别是哪个相机,2个相机对应不同的目标检测业务

        # -*- coding: utf-8 -*-
        import sys
        import time
        import os
        import numpy as np
        import cv2
        from ctypes import *
        import termios
        from datetime import datetime
        import threading
        sys.path.append("/opt/MVS/Samples/64/Python/MvImport")  # 导入相应SDK的库,实际安装位置绝对路径
        from MvCameraControl_class import *
        #以下需要配置
        #抓图保存路径,提供给app
        grab_dir="box-end/app/new_coming_images/"
        DEVICE_USER_ID_FRONT = "user_id_002"
        DEVICE_USER_ID_BACK = "user_id_001"
        #其他全局默认值,不用配置
        CAM_LIST = []
        DEVICE_NUM = 0
        GRAB_RUN = True
        SERVICE_ID = "200002"
        USER_DATA_PY = []
        #DEVICE_IDX = None
        # 打印设备详情
        def printDeviceInfo(deviceList):
            for i in range(0, deviceList.nDeviceNum):
                mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
                print("mvcc_dev_info",mvcc_dev_info)
                if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
                    print ("\ngige device: [%d]" % i)
                    strModeName = ""
                    for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
                        strModeName = strModeName + chr(per)
                    print ("device model name: %s" % strModeName)
                    nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
                    nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
                    nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
                    nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
                    print ("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
                elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
                    print ("\nu3v device: [%d]" % i)
                    strModeName = ""
                    for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
                        if per == 0:
                            break
                        strModeName = strModeName + chr(per)
                    print ("device model name: %s" % strModeName)
                    strSerialNumber = ""
                    for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
                        if per == 0:
                            break
                        strSerialNumber = strSerialNumber + chr(per)
                    print ("user serial number: %s" % strSerialNumber)
        # 判读图像格式是彩色还是黑白
        def IsImageColor(enType):
            dates = {
                PixelType_Gvsp_RGB8_Packed: 'color',
                PixelType_Gvsp_BGR8_Packed: 'color',
                PixelType_Gvsp_YUV422_Packed: 'color',
                PixelType_Gvsp_YUV422_YUYV_Packed: 'color',
                PixelType_Gvsp_BayerGR8: 'color',
                PixelType_Gvsp_BayerRG8: 'color',
                PixelType_Gvsp_BayerGB8: 'color',
                PixelType_Gvsp_BayerBG8: 'color',
                PixelType_Gvsp_BayerGB10: 'color',
                PixelType_Gvsp_BayerGB10_Packed: 'color',
                PixelType_Gvsp_BayerBG10: 'color',
                PixelType_Gvsp_BayerBG10_Packed: 'color',
                PixelType_Gvsp_BayerRG10: 'color',
                PixelType_Gvsp_BayerRG10_Packed: 'color',
                PixelType_Gvsp_BayerGR10: 'color',
                PixelType_Gvsp_BayerGR10_Packed: 'color',
                PixelType_Gvsp_BayerGB12: 'color',
                PixelType_Gvsp_BayerGB12_Packed: 'color',
                PixelType_Gvsp_BayerBG12: 'color',
                PixelType_Gvsp_BayerBG12_Packed: 'color',
                PixelType_Gvsp_BayerRG12: 'color',
                PixelType_Gvsp_BayerRG12_Packed: 'color',
                PixelType_Gvsp_BayerGR12: 'color',
                PixelType_Gvsp_BayerGR12_Packed: 'color',
                PixelType_Gvsp_Mono8: 'mono',
                PixelType_Gvsp_Mono10: 'mono',
                PixelType_Gvsp_Mono10_Packed: 'mono',
                PixelType_Gvsp_Mono12: 'mono',
                PixelType_Gvsp_Mono12_Packed: 'mono'}
            return dates.get(enType, '未知')
        # 回调取图采集
        def image_callback_main(pData, pFrameInfo, device_idx):
            global USER_DATA_PY    
            stFrameInfo = cast(pFrameInfo, POINTER(MV_FRAME_OUT_INFO_EX)).contents
            #print("pUser",pUser,"cast",cast(pUser, POINTER(py_object)))
            # user_data_py = cast(pUser, POINTER(py_object)).contents.value
            # print("user_data_py",user_data_py)
            # cam = user_data_py["cam"]
            # device_idx = user_data_py["device_idx"]
            # print("pUser",pUser)
            # DEVICE_IDX = cast(pUser, POINTER(py_object)).contents.value
            # print("DEVICE_IDX",DEVICE_IDX)
            # print("USER_DATA_PY",USER_DATA_PY,"DEVICE_IDX",DEVICE_IDX)
            # cam = USER_DATA_PY[DEVICE_IDX]["cam"]
            DEVICE_IDX = device_idx
            cam = USER_DATA_PY[DEVICE_IDX]["cam"]
            if stFrameInfo:
                print("get one frame: Width[%d], Height[%d], nFrameNum[%d], enPixelType[%s]" % (stFrameInfo.nWidth, stFrameInfo.nHeight, stFrameInfo.nFrameNum, stFrameInfo.enPixelType))
                time_start = time.time()
                #图片名格式 image-200001-4-2022-01-18-10-57-03-1642474623702.jpg
                img_name = "image-"+SERVICE_ID+"-"+str(DEVICE_IDX+1)+"-"+datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
                img_name_tmp = img_name + ".jpeg"
                img_name_jpg = img_name + ".jpg"
                img_path_tmp = os.path.join(grab_dir, img_name_tmp)
                img_path_jpg = os.path.join(grab_dir, img_name_jpg)
                stConvertParam = MV_CC_PIXEL_CONVERT_PARAM()
                memset(byref(stConvertParam), 0, sizeof(stConvertParam))
                if IsImageColor(stFrameInfo.enPixelType) == 'mono':
                    print("mono!")
                    stConvertParam.enDstPixelType = PixelType_Gvsp_Mono8
                    nConvertSize = stFrameInfo.nWidth * stFrameInfo.nHeight
                elif IsImageColor(stFrameInfo.enPixelType) == 'color':
                    print("color!")
                    stConvertParam.enDstPixelType = PixelType_Gvsp_BGR8_Packed  # opecv要用BGR,不能使用RGB
                    nConvertSize = stFrameInfo.nWidth * stFrameInfo.nHeight* 3
                else:
                    print("not support!!!")
                img_buffer = (c_ubyte * stFrameInfo.nFrameLen)()
                
                stConvertParam.nWidth = stFrameInfo.nWidth
                stConvertParam.nHeight = stFrameInfo.nHeight
                stConvertParam.pSrcData = cast(pData, POINTER(c_ubyte))
                stConvertParam.nSrcDataLen = stFrameInfo.nFrameLen
                stConvertParam.enSrcPixelType = stFrameInfo.enPixelType
                stConvertParam.pDstBuffer = (c_ubyte * nConvertSize)()
                stConvertParam.nDstBufferSize = nConvertSize
                print('time cos 1:', time.time() - time_start, 's') 
                ret = cam.MV_CC_ConvertPixelType(stConvertParam)
                print('time cos 2:', time.time() - time_start, 's') 
                if ret != 0:
                    print("convert pixel fail! ret[0x%x]" % ret)
                    del stConvertParam.pSrcData
                    sys.exit()
                else:
                    #print("convert ok!!")
                    # 转OpenCV
                    # 黑白处理
                    if IsImageColor(stFrameInfo.enPixelType) == 'mono':
                        img_buffer = (c_ubyte * stConvertParam.nDstLen)()
                        memmove(byref(img_buffer), stConvertParam.pDstBuffer, stConvertParam.nDstLen)
                        img_buffer = np.frombuffer(img_buffer,count=int(stConvertParam.nDstLen), dtype=np.uint8)
                        img_buffer = img_buffer.reshape((stFrameInfo.nHeight, stFrameInfo.nWidth))
                        #print("mono ok!!")
                    # 彩色处理
                    if IsImageColor(stFrameInfo.enPixelType) == 'color':
                        img_buffer = (c_ubyte * stConvertParam.nDstLen)()
                        memmove(byref(img_buffer), stConvertParam.pDstBuffer, stConvertParam.nDstLen)
                        img_buffer = np.frombuffer(img_buffer, count=int(stConvertParam.nDstBufferSize), dtype=np.uint8)
                        img_buffer = img_buffer.reshape(stFrameInfo.nHeight,stFrameInfo.nWidth,3)
                        #print("color ok!!")
                    print('time cos 3:', time.time() - time_start, 's')
                    height, width = img_buffer.shape[0:2]
                    img_buffer = cv2.resize(img_buffer, (int(width/2), int(height/2)), interpolation=cv2.INTER_AREA)
                    print("img_path",img_path_tmp)
                    cv2.imwrite(img_path_tmp, img_buffer)
                    os.rename(img_path_tmp, img_path_jpg)
                    #cv2.imshow('img', img_buffer)
                    #cv2.waitKey(10)
                    #下面是模拟摄像机2
                    # img_name = "image-"+SERVICE_ID+"-"+str(DEVICE_IDX+2)+"-"+datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
                    # img_name_tmp = img_name + ".jpeg"
                    # img_name_jpg = img_name + ".jpg"
                    # img_path_tmp = os.path.join(grab_dir, img_name_tmp)
                    # img_path_jpg = os.path.join(grab_dir, img_name_jpg)
                    # cv2.imwrite(img_path_tmp, img_buffer)
                    # os.rename(img_path_tmp, img_path_jpg)
                print("")
        g_winfun_ctype = CFUNCTYPE
        g_st_frame_info = POINTER(MV_FRAME_OUT_INFO_EX)
        g_p_data = POINTER(c_ubyte)
        FrameInfoCallBack = g_winfun_ctype(None, g_p_data, g_st_frame_info, c_void_p)
        def image_callback_0(pData, pFrameInfo, pUser):
            image_callback_main(pData, pFrameInfo, 0)
        def image_callback_1(pData, pFrameInfo, pUser):
            image_callback_main(pData, pFrameInfo, 1)
        # 因为回调传值会变化,暂时只能用2个回调区别那个摄像头
        CALL_BACK_FUN_0 = FrameInfoCallBack(image_callback_0)
        CALL_BACK_FUN_1 = FrameInfoCallBack(image_callback_1)
        def press_any_key_exit():
            fd = sys.stdin.fileno()
            old_ttyinfo = termios.tcgetattr(fd)
            new_ttyinfo = old_ttyinfo[:]
            new_ttyinfo[3] &= ~termios.ICANON
            new_ttyinfo[3] &= ~termios.ECHO
            #sys.stdout.write(msg)
            #sys.stdout.flush()
            termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo)
            try:
                os.read(fd, 7)
            except:
                pass
            finally:
                termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo)
        def add_trigger_event_thread(cam=0, idx=None):
            print("add_trigger_event_thread")
            while True:
                if GRAB_RUN==False:
                    break
                ret = cam.MV_CC_SetCommandValue("TriggerSoftware");
                if ret != 0:
                    print("TriggerSoftware fail! ret[0x%x]" % ret)
                    break
                time.sleep(2)
        def start():
            global GRAB_RUN
            global CAM_LIST
            global DEVICE_NUM
            global USER_DATA_PY
            deviceList = MV_CC_DEVICE_INFO_LIST()
            tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
            # 1 枚举设备 | en:Enum device
            ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
            if ret != 0:
                print("enum devices fail! ret[0x%x]" % ret)
                GRAB_RUN = False
                return
            if deviceList.nDeviceNum == 0:
                print("find no device!")
                GRAB_RUN = False
                return
            print("Find %d devices!" % deviceList.nDeviceNum)
            # 打印设备详情
            printDeviceInfo(deviceList)
            # 2 打开
            # 2.1 创建相机实例 | en:Creat Camera Object
            DEVICE_NUM = deviceList.nDeviceNum
            for i in range(0, DEVICE_NUM):
                CAM_LIST.append(MvCamera())
                # ch:选择设备并创建句柄| en:Select device and create handle
                stDeviceList = cast(deviceList.pDeviceInfo[int(i)], POINTER(MV_CC_DEVICE_INFO)).contents
                CAM_LIST[i].MV_CC_CreateHandle(stDeviceList)
                if ret != 0:
                    print("create handle fail! ret[0x%x]" % ret)
                    GRAB_RUN = False
                    return
                # 2.2 打开设备 | en:Open device
                ret = CAM_LIST[i].MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
                if ret != 0:
                    print("open device fail! ret[0x%x]" % ret)
                    GRAB_RUN = False
                    return
                ret = CAM_LIST[i].MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS)
                if ret != 0:
                    print("set ExposureAuto fail! ret[0x%x]" % ret)
                    GRAB_RUN = False
                    return
                ret = CAM_LIST[i].MV_CC_SetEnumValue("GainAuto",MV_GAIN_MODE_CONTINUOUS)
                if ret != 0:
                    print("set GainAuto fail! ret[0x%x]" % ret)
                    GRAB_RUN = False
                    return
                ret = CAM_LIST[i].MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_ON)
                #ret = CAM_LIST[i].MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
                if ret != 0:
                    print('Enable trigger failed! [{0:#X}]'.format(ret))
                    GRAB_RUN = False
                    return False
                ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_SOFTWARE)
                #ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_LINE0)
                if ret != 0:
                    print('Set trigger source failed! [{0:#X}]'.format(ret))
                    GRAB_RUN = False
                    return False
                # ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerActivation', 3)
                # if ret != 0:
                #     print('Set trigger source failed! [{0:#X}]'.format(ret))
                #     GRAB_RUN = False
                #     return False
                stStringValue = MVCC_STRINGVALUE()
                memset(byref(stStringValue), 0, sizeof(MVCC_STRINGVALUE))
                ret = CAM_LIST[i].MV_CC_GetStringValue("DeviceUserID", stStringValue)
                if ret != 0:
                    print("获取 string 型数据 %s 失败 ! 报错码 ret[0x%x]" % ("DeviceUserID", ret))
                    GRAB_RUN = False
                    return False
                device_user_id = bytes.decode(stStringValue.chCurValue)
                print("device_user_id", device_user_id)
                USER_DATA_PY.append({
                    "cam":CAM_LIST[i],
                    "device_idx":i
                })
                #user_data = cast(pointer(py_object(i)), c_void_p)
                #user_data = cast(pointer(c_int(i)), c_void_p)
                #print("pass callback arg,i:",i)
                #user_data = pointer(c_int(i))
                #device_idx = (c_int)()
                #device_idx = i
                #print("pass callback arg,device_idx:",device_idx)
                #user_data = cast(pointer(c_int(device_idx)), c_void_p)
                user_data = cast(pointer(py_object(i)), c_void_p)
                if device_user_id == DEVICE_USER_ID_FRONT:
                    call_back_fun = CALL_BACK_FUN_0
                elif device_user_id == DEVICE_USER_ID_BACK:
                    call_back_fun = CALL_BACK_FUN_1
                ret = CAM_LIST[i].MV_CC_RegisterImageCallBackEx(call_back_fun,user_data)
                if ret != 0:
                    print('Register callback failed! [{0:#X}]'.format(ret))
                    GRAB_RUN = False
                    return False
                hThreadHandle = threading.Thread(target=add_trigger_event_thread, args=(CAM_LIST[i],i))
                hThreadHandle.start()
                # try:
                # except:
                #     print ("error: unable to start thread")
                ret = CAM_LIST[i].MV_CC_StartGrabbing()
                if ret != 0:
                    print('Start grabbing failed! [{0:#X}]'.format(ret))
                    GRAB_RUN = False
                    return False
        def stop():
            global GRAB_RUN
            global CAM_LIST
            global DEVICE_NUM
            for i in range(0, DEVICE_NUM):
                # 5 关闭
                # 5.1 停止取流 | en:Stop grab image
                print("stop grabbing device index[%d]" % i)
                ret = CAM_LIST[i].MV_CC_StopGrabbing()
                if ret != 0:
                    print("stop grabbing fail! ret[0x%x]" % ret)
                    break
                    #sys.exit()
                # 5.2 关闭设备 | Close device
                ret = CAM_LIST[i].MV_CC_CloseDevice()
                if ret != 0:
                    print("close deivce fail! ret[0x%x]" % ret)
                    break
                    #sys.exit()
                # 6 销毁句柄 | Destroy handle
                ret = CAM_LIST[i].MV_CC_DestroyHandle()
                if ret != 0:
                    print("destroy handle fail! ret[0x%x]" % ret)
                    break
                    #sys.exit()
            GRAB_RUN = False
        start()
        print ("press a key to stop grabbing.")
        press_any_key_exit()
        stop()
        

        硬件触发

        改成硬件触发只需要修改一句

        TriggerSource的MV_TRIGGER_SOURCE_SOFTWARE改为MV_TRIGGER_SOURCE_LINE0

        ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_LINE0)
        

        硬件触发接线图

        1相机I/O管脚接口定义,线的颜色可能不同,但是功能是一样的

        管脚信号I/O信号源说明线缆颜色
        1DC_PWR相机电源
        2OPTO_INLine 0+光耦隔离输入
        3GPIOLine 2可配置输入或输出
        4OPTO_OUTLine 1+光耦隔离输出
        5OPTO_GNDLine 0/1-光耦隔离信号地绿
        6GNDLine 2-相机电源地

        触发硬件用光电开关,NPN设备,网上的接线图没有画出电阻的接线位置,实际电阻位置如下

        在这里插入图片描述

        AI目标检测接口

        对yolov5的封装

        # -*- coding: utf-8 -*-
        #导入需要的库
        import os
        import sys
        from pathlib import Path
        import numpy as np
        import cv2
        import torch
        import torch.backends.cudnn as cudnn
        from tqdm import tqdm
        import shutil
        import io 
        from models.common_2 import DetectMultiBackend
        from utils.dataloaders import IMG_FORMATS, VID_FORMATS, LoadImages, LoadStreams
        from utils.general import (LOGGER, check_file, check_img_size, check_imshow, check_requirements, colorstr,
                                   increment_path, non_max_suppression, print_args, scale_boxes, strip_optimizer, xyxy2xywh)
        from utils.plots import Annotator, colors, save_one_box
        from utils.torch_utils import select_device, time_sync
        #导入letterbox
        from utils.augmentations import Albumentations, augment_hsv, copy_paste, letterbox, mixup, random_perspective
        class U5():
            def __init__(self, weights="/weights/last.pt", conf_thres=0.25):
                #weights=ROOT / 'yolov5s.pt'  # 权重文件地址   .pt文件
                self.weights = weights
                #source=ROOT / 'data/images'  # 测试数据文件(图片或视频)的保存路径
                #data=ROOT / 'data/coco128.yaml'  # 标签文件地址   .yaml文件
                
                self.imgsz=(640, 640)  # 输入图片的大小 默认640(pixels)
                self.conf_thres=conf_thres  # object置信度阈值 默认0.25  用在nms中
                self.iou_thres=0.45  # 做nms的iou阈值 默认0.45   用在nms中
                self.max_det=1000  # 每张图片最多的目标数量  用在nms中
                device=''  # 设置代码执行的设备 cuda device, i.e. 0 or 0,1,2,3 or cpu
                self.classes=None  # 在nms中是否是只保留某些特定的类 默认是None 就是所有类只要满足条件都可以保留 --class 0, or --class 0 2 3
                self.agnostic_nms=False  # 进行nms是否也除去不同类别之间的框 默认False
                self.augment=False  # 预测是否也要采用数据增强 TTA 默认False
                self.visualize=False  # 特征图可视化 默认FALSE
                self.half=False  # 是否使用半精度 Float16 推理 可以缩短推理时间 但是默认是False
                self.dnn=False  # 使用OpenCV DNN进行ONNX推理
                # 获取设备
                self.device = select_device(device)
                # 载入模型
                # self.model = DetectMultiBackend(weights, device=device, dnn=self.dnn, data=data)
        	# self.model = DetectMultiBackend(weights, device=self.device, dnn=self.dnn)
                w = str(weights[0] if isinstance(weights, list) else weights)
                print(type(w),w)
                source_file = open(w, "rb")
                content = source_file.read()
                weights_bytes = io.BytesIO(content)
                self.model = DetectMultiBackend(weights_bytes, model_type="pt", device=self.device, dnn=self.dnn)
                source_file.close()
                weights_bytes.close()        
                self.stride, self.names, self.pt, jit, onnx, engine = self.model.stride, self.model.names, self.model.pt, self.model.jit, self.model.onnx, self.model.engine
                imgsz = check_img_size(self.imgsz, s=self.stride)  # 检查图片尺寸
                print("names",self.names)
                # Half
                # 使用半精度 Float16 推理
                self.half &= (self.pt or jit or onnx or engine) and self.device.type != 'cpu'  # FP16 supported on limited backends with CUDA
                if self.pt or jit:
                    self.model.model.half() if self.half else self.model.model.float()
                # 开始预测
                self.model.warmup(imgsz=(1, 3, *self.imgsz))  # warmup
            def detect(self, img):
                # Dataloader
                # 载入数据
                # dataset = LoadImages(source, img_size=imgsz, stride=stride, auto=pt)
            
                # Run inference
                dt, seen = [0.0, 0.0, 0.0], 0
            
                #对图片进行处理
                im0 = img
                # Padded resize
                im = letterbox(im0, self.imgsz, self.stride, auto=self.pt)[0]
                # Convert
                im = im.transpose((2, 0, 1))[::-1]  # HWC to CHW, BGR to RGB
                im = np.ascontiguousarray(im)
                t1 = time_sync()
                im = torch.from_numpy(im).to(self.device)
                im = im.half() if self.half else im.float()  # uint8 to fp16/32
                im /= 255  # 0 - 255 to 0.0 - 1.0
                if len(im.shape) == 3:
                    im = im[None]  # expand for batch dim
                t2 = time_sync()
                dt[0] += t2 - t1
            
                # Inference
                # 预测
                pred = self.model(im, augment=self.augment, visualize=self.visualize)
                t3 = time_sync()
                dt[1] += t3 - t2
            
                # NMS
                pred = non_max_suppression(pred, self.conf_thres, self.iou_thres, self.classes, self.agnostic_nms, max_det=self.max_det)
                dt[2] += time_sync() - t3
                #print("pred:",pred)
            
                #用于存放结果
                detections=[]
                # 图片画框
                line_thickness = 3
                annotator = Annotator(im0, line_width=line_thickness, example=str(self.names))
                # Process predictions
                for i, det in enumerate(pred):  # per image 每张图片
                    seen += 1
                    # im0 = im0s.copy()
                    gn = torch.tensor(im0.shape)[[1, 0, 1, 0]]  # normalization gain whwh
                    if len(det):
                        # Rescale boxes from img_size to im0 size
                        det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], im0.shape).round()
                        # Write results
                        # 写入结果
                        for *xyxy, conf, index in reversed(det):
                            xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4))/ gn).view(-1).tolist()
                            #line = (index, *xywh)
                            #print("line:",line)
                            #xywh = [round(x) for x in xywh]
                            #xywh = [xywh[0] - xywh[2] // 2, xywh[1] - xywh[3] // 2, xywh[2], xywh[3]]  # 检测到目标位置,格式:(left,top,w,h)
                            cls = self.names[int(index)]
                            conf = float(conf)
                            detections.append({'class': cls, 'conf': conf, 'position': xywh, 'index': int(index)})
                            #图片画框
                            annotator.box_label(xyxy, cls, color=colors(int(index), True))
                    #cv2.imwrite("/home/wai/hik/code/test.jpg", im0)
                #输出结果
                #for i in detections:
                #    print(i)
            
                #推测的时间
                print(f'({t3 - t2:.3f}s)')
                return detections
        

        flask对http接口的实现

        # -*- coding: utf-8 -*-
        # +
        from distutils.command.config import config
        import sys
        #sys.path.append('/home/wai/hik/yolo/yolov5_s/')
        sys.path.append('yolov5_s/')
        from flask import Flask, g, jsonify, make_response, request, render_template
        import base64
        import numpy as np
        import cv2
        from sdk import U5
        import time
        app = Flask(__name__)
        #app.json.ensure_ascii = False
        # 以下需要配置
        #u5 = U5(weights = '/home/wai/hik/yolo/yolov5_s/runs/train/exp/weights/last.pt')
        u5 = U5(weights = 'yolov5_s/runs/train/exp/weights/last.pt')
        #根据车门类型获取正确的数量配置
        SCAN_CODE_FILE = "/home/wai/hik/code/scan_code.txt"
        CFG_CORRECT_ANSWER = [
            {
                "no":"000001",
                "cfg":{
                    "front":{
                        "screw_3":2,
                        "lines":5,
                        "block_1":1,
                        "block_2":2
                    },
                    "back":{
                        "screw_1":7,
                        "screw_2":29,
                        "wang":0
                    }
                }
            },
            {
                "no":"000002",
                "cfg":{
                    "front":{
                        "screw_1":7,
                        "screw_2":29,
                        "wang":0
                    },
                    "back":{
                        "screw_3":2,
                        "lines":5,
                        "block_1":1,
                        "block_2":2
                    }
                }
            },
        ]
        def detections_count(detections,key):
            count = 0
            for item in detections:
                if item["class"] == key:
                    count = count+1
            return count
        @app.route('/api/ai_detection', methods=['POST'])
        def ai_detection():
            print("==> ai_detection start")
            time_start = time.time()
            images = request.json.get('images')
            #print("type fileBase64",type(fileBase64),fileBase64)
            img_data = base64.b64decode(images[0][23:])
            img_array = np.frombuffer(img_data, np.uint8)
            img = cv2.imdecode(img_array, cv2.COLOR_RGB2BGR)
            #print("img",img)
            #cv2.imwrite("server_test_img.jpg", img)
            print('time cos detect 1:', time.time()-time_start, 's') 
            detections = u5.detect(img)
            print('time cos detect 2:', time.time()-time_start, 's') 
            print("detections",detections)
            images_id = request.json.get('images_id')
            # 读取扫码枪扫码的文件
            fileRead = open(SCAN_CODE_FILE, "r")
            scan_code = fileRead.read()
            fileRead.close()
            print("scan_code",scan_code)
            # 根据扫码的code获取正确配置
            cfg = None
            for item in CFG_CORRECT_ANSWER:
                if item["no"] == scan_code:
                    cfg = item["cfg"]
                    break
            #print("cfg",cfg)
            if cfg == None:
                result = {
                    'boxes': [],
                    'image': images[0],
                    'image_id': images_id[0],
                    'scene_name': 'detect'
                }
            else:
               
                # 根据camera_id判断正反面,1正面,2反面
                camera_id = images_id[0].split("-")[2]
                print("==> camera_id",camera_id)
                part_num = None
                if camera_id == "1":
                    part_num = cfg["front"]
                elif camera_id == "2":
                    part_num = cfg["back"]
                print("correct_cfg",part_num)
                good = 1
                if part_num != None:
                    for key in part_num:
                        value = part_num[key]
                        dc = detections_count(detections,key)
                        print("==> ",key,"ai_count",dc,"correct_count",value)
                        if value!=dc:
                            good = 0
                            break
                # 图片中间画出结果
                height, width = img.shape[0:2]
                if camera_id == "1":
                    result_pos = (width-300, 200)
                elif camera_id == "2":
                    result_pos = (100, 200)
                if good==0:
                    result_text = "NG"
                    result_color = (0,0,255)
                elif good==1:
                    result_text = "OK"
                    result_color = (0,128,0)
                #print("height,width",height,width)
                cv2.putText(img, result_text, result_pos, cv2.FONT_HERSHEY_COMPLEX, 5.0, result_color, 10)
                # print("img",img)
                img = cv2.imencode('.jpg', img)[1]
                image_result_base64 = 'data:image/jpeg;base64,' + str(base64.b64encode(img))[2:-1]
                result = {
                    'boxes': [],
                    'image': image_result_base64,
                    'image_id': images_id[0],
                    'good': good
                }
            #return jsonify({'code': 200, 'msg': '检测成功', 'detections': detections})
            result_no_image = result.copy()
            result_no_image["image"]="base64"
            #print("result_no_image",result_no_image)
            print('time cos detect 3:', time.time()-time_start, 's') 
            print("")
            return jsonify(result)
        # 运行代码
        if __name__ == '__main__':
            app.run(host='0.0.0.0',port=5000)
        
微信扫一扫加客服

微信扫一扫加客服

点击启动AI问答
Draggable Icon