Python OpenCV使用dlib进行多目标跟踪详解

目录
  • 1.使用dlib进行多目标跟踪
  • 2.项目结构
  • 3.dlib多对象跟踪的简单“朴素”方法
  • 4.快速、高效的dlib多对象跟踪实现
  • 5.完整代码
  • 6.改进和建议

在本教程中,您将学习如何使用 dlib 库在实时视频中有效地跟踪多个对象。

我们当然可以使用 dlib 跟踪多个对象;但是,为了获得可能的最佳性能,我们需要利用多处理并将对象跟踪器分布在处理器的多个内核上。

正确利用多处理使我们能够将 dlib 多对象跟踪每秒帧数 (FPS) 提高 45% 以上!

1.使用 dlib 进行多目标跟踪

在本指南的第一部分,我将演示如何实现一个简单、朴素的 dlib 多对象跟踪脚本。该程序将跟踪视频中的多个对象;但是,我们会注意到脚本运行速度有点慢。 为了提高我们的 FPS,我将向您展示一个更快、更高效的 dlib 多对象跟踪器实现。 最后,我将讨论一些改进和建议,以增强我们的多对象跟踪实现。

2.项目结构

你可以使用tree命令查看我们的项目结构:

mobilenet_ssd/ 目录包含我们的 MobileNet + SSD Caffe 模型文件,它允许我们检测人(以及其他对象)。 今天我们将回顾两个 Python 脚本:

  • multi_object_tracking_slow.py:dlib 多对象跟踪的简单“朴素”方法。
  • multi_object_tracking_fast.py:利用多处理的先进、快速的方法。

3.dlib 多对象跟踪的简单“朴素”方法

我们今天要介绍的第一个 dlib 多对象跟踪实现是“朴素的”,因为它将:

1.使用一个简单的跟踪器对象列表。

2.仅使用我们处理器的单个内核按顺序更新每个跟踪器。

对于某些对象跟踪任务,此实现将绰绰有余;然而,为了优化我们的 FPS,我们应该将对象跟踪器分布在多个进程中。

我们将从本节中的简单实现开始,然后在下一节中转到更快的方法。 首先,打开multi_object_tracking_slow.py 脚本并插入以下代码:

# import the necessary packages
from imutils.video import FPS
import numpy as np
import argparse
import imutils
import dlib
import cv2

让我们解析我们的命令行参数:

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
    help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
    help="path to Caffe pre-trained model")
ap.add_argument("-v", "--video", required=True,
    help="path to input video file")
ap.add_argument("-o", "--output", type=str,
    help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
    help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

我们的脚本在运行时处理以下命令行参数:

  • --prototxt :Caffe 部署 prototxt 文件的路径。
  • --model : prototxt 附带的模型文件的路径。
  • --video : 输入视频文件的路径。我们将在此视频中使用 dlib 执行多对象跟踪。
  • --output :输出视频文件的可选路径。如果未指定路径,则不会将视频输出到磁盘。我建议输出到 .avi 或 .mp4 文件。
  • --confidence :对象检测置信度阈值 ,默认是0.2 ,该值表示从对象检测器过滤弱检测的最小概率。

让我们定义这个模型支持的类列表,并从磁盘加载我们的模型:

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
    "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
    "sofa", "train", "tvmonitor"]
# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

我们只关心今天的赛跑示例中的“人”类,但您可以轻松修改以跟踪其他类。 我们加载了预训练的对象检测器模型。我们将使用我们预训练的 SSD 来检测视频中物体的存在。我们将创建一个 dlib 对象跟踪器来跟踪每个检测到的对象。

我们还有一些初始化要执行:

# initialize the video stream and output video writer
print("[INFO] starting video stream...")
vs = cv2.VideoCapture(args["video"])
writer = None
# initialize the list of object trackers and corresponding class
# labels
trackers = []
labels = []
# start the frames per second throughput estimator
fps = FPS().start()

我们初始化我们的视频流——我们将从输入视频中一次读取一个帧。 随后,我们的视频writer被初始化为 None 。在即将到来的 while 循环中,我们将与视频writer进行更多合作。 现在初始化我们的跟踪器和标签列表。 最后,开始我们的每秒帧数计数器。 我们都准备好开始处理视频了:

# loop over frames from the video file stream
while True:
    # grab the next frame from the video file
    (grabbed, frame) = vs.read()
    # check to see if we have reached the end of the video file
    if frame is None:
        break
    # resize the frame for faster processing and then convert the
    # frame from BGR to RGB ordering (dlib needs RGB ordering)
    frame = imutils.resize(frame, width=600)
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    # if we are supposed to be writing a video to disk, initialize
    # the writer
    if args["output"] is not None and writer is None:
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        writer = cv2.VideoWriter(args["output"], fourcc, 30,
            (frame.shape[1], frame.shape[0]), True)

将帧调整为600像素宽,保持高宽比。然后,为了dlib兼容性,帧被转换为RGB颜色通道排序(OpenCV的默认值是BGR,而dlib的默认值是RGB)。

让我们开始对象检测阶段:

    # if there are no object trackers we first need to detect objects
    # and then create a tracker for each object
    if len(trackers) == 0:
        # grab the frame dimensions and convert the frame to a blob
        (h, w) = frame.shape[:2]
        blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)
        # pass the blob through the network and obtain the detections
        # and predictions
        net.setInput(blob)
        detections = net.forward()

为了执行对象跟踪,我们必须首先执行对象检测

  • 手动,通过停止视频流并手动选择每个对象的边界框。
  • 以编程方式,使用经过训练的对象检测器来检测对象的存在(这就是我们在这里所做的)。

如果没有对象跟踪器,那么我们知道我们还没有执行对象检测。

我们创建并通过 SSD 网络传递一个 blob 以检测对象。

接下来,我们继续循环检测以查找属于person类的对象,因为我们的输入视频是人类的赛跑:

        # loop over the detections
        for i in np.arange(0, detections.shape[2]):
            # extract the confidence (i.e., probability) associated
            # with the prediction
            confidence = detections[0, 0, i, 2]
            # filter out weak detections by requiring a minimum
            # confidence
            if confidence > args["confidence"]:
                # extract the index of the class label from the
                # detections list
                idx = int(detections[0, 0, i, 1])
                label = CLASSES[idx]
                # if the class label is not a person, ignore it
                if CLASSES[idx] != "person":
                    continue

我们开始循环检测,其中我们:

  • 过滤掉弱检测。
  • 确保每个检测都是一个person。当然,您可以删除这行代码或根据您自己的过滤需求对其进行自定义。

现在我们已经在框架中定位了每个person,让我们实例化我们的跟踪器并绘制我们的初始边界框 + 类标签:

                # compute the (x, y)-coordinates of the bounding box
                # for the object
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")
                # construct a dlib rectangle object from the bounding
                # box coordinates and start the correlation tracker
                t = dlib.correlation_tracker()
                rect = dlib.rectangle(startX, startY, endX, endY)
                t.start_track(rgb, rect)
                # update our set of trackers and corresponding class
                # labels
                labels.append(label)
                trackers.append(t)
                # grab the corresponding class label for the detection
                # and draw the bounding box
                cv2.rectangle(frame, (startX, startY), (endX, endY),
                    (0, 255, 0), 2)
                cv2.putText(frame, label, (startX, startY - 15),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

要开始跟踪对象,我们:

  • 计算每个检测到的对象的边界框。
  • 实例化边界框坐标并将其传递给跟踪器。边界框在这里尤为重要。我们需要为边界框创建一个 dlib.rectangle 并将其传递给 start_track 方法。然后,dlib 可以开始跟踪对象。
  • 最后,我们用单个跟踪器填充trackers列表。

因此,在下一个代码块中,我们将处理已经建立跟踪器并且只需要更新位置的情况。 我们在初始检测步骤中执行了两个额外的任务:

  • 将类标签附加到标签列表。如果您要跟踪多种类型的对象(例如dog+person),您可能希望知道每个对象的类型。
  • 在对象周围绘制每个边界框矩形和类标签。

如果我们的检测列表的长度大于0,我们就知道我们处于目标跟踪阶段:

    # otherwise, we've already performed detection so let's track
    # multiple objects
    else:
        # loop over each of the trackers
        for (t, l) in zip(trackers, labels):
            # update the tracker and grab the position of the tracked
            # object
            t.update(rgb)
            pos = t.get_position()
            # unpack the position object
            startX = int(pos.left())
            startY = int(pos.top())
            endX = int(pos.right())
            endY = int(pos.bottom())
            # draw the bounding box from the correlation object tracker
            cv2.rectangle(frame, (startX, startY), (endX, endY),
                (0, 255, 0), 2)
            cv2.putText(frame, l, (startX, startY - 15),
                cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

在目标跟踪阶段,我们遍历所有trackers和相应的labels。然后我们继续update每个对象的位置。为了更新位置,我们只需传递 rgb 图像。

提取边界框坐标后,我们可以为每个被跟踪对象绘制一个边界框rectangle和label。

帧处理循环中的其余步骤涉及写入输出视频(如有必要)并显示结果:

    # check to see if we should write the frame to disk
    if writer is not None:
        writer.write(frame)
    # show the output frame
    cv2.imshow("Frame", frame)
    key = cv2.waitKey(1) & 0xFF
    # if the `q` key was pressed, break from the loop
    if key == ord("q"):
        break
    # update the FPS counter
    fps.update()

在这里,我们:

  • 如有必要,将frame写入视频。
  • 显示输出帧并捕获按键。如果按下q键(退出),我们就会跳出循环。 最后,我们更新我们的每秒帧数信息以进行基准测试。

剩下的步骤是在终端打印FPS信息并释放指针:

# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# check to see if we need to release the video writer pointer
if writer is not None:
    writer.release()
# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

让我们评估准确性和性能。打开终端并执行以下命令:

$ python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \
    --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \
    --video race.mp4 --output race_output_slow.avi
[INFO] loading model...
[INFO] starting video stream...
[INFO] elapsed time: 24.51
[INFO] approx. FPS: 13.87

看来我们的多目标跟踪器起作用了!

但正如你所看到的,我们只获得了约13帧/秒。

对于某些应用程序来说,这个FPS可能已经足够了——然而,如果你需要更快的FPS,我建议你看看下面我们更高效的dlib多对象跟踪器。其次,要明白跟踪的准确性并不完美。

4.快速、高效的 dlib 多对象跟踪实现

如果您运行上一节中的 dlib 多对象跟踪脚本并同时打开系统的监视器,您会注意到只使用了处理器的一个内核。

如果您运行上一节中的 dlib 多对象跟踪脚本并同时打开系统的活动监视器,您会注意到只使用了处理器的一个内核。

利用进程使我们的操作系统能够执行更好的进程调度,将进程映射到我们机器上的特定处理器内核(大多数现代操作系统能够以并行方式有效地调度使用大量 CPU 的进程)。

继续打开 mutli_object_tracking_fast.py 并插入以下代码:

# import the necessary packages
from imutils.video import FPS
import multiprocessing
import numpy as np
import argparse
import imutils
import dlib
import cv2

我们将使用 Python Process 类来生成一个新进程——每个新进程都独立于原始进程。

为了生成这个进程,我们需要提供一个 Python 可以调用的函数,然后 Python 将使用该函数并创建一个全新的进程并执行它:

def start_tracker(box, label, rgb, inputQueue, outputQueue):
    # construct a dlib rectangle object from the bounding box
    # coordinates and then start the correlation tracker
    t = dlib.correlation_tracker()
    rect = dlib.rectangle(box[0], box[1], box[2], box[3])
    t.start_track(rgb, rect)

start_tracker 的前三个参数包括:

  • box :我们要跟踪的对象的边界框坐标,可能是由某种对象检测器返回的,无论是手动的还是编程的。
  • label :对象的人类可读标签。
  • rgb :我们将用于启动初始 dlib 对象跟踪器的 RGB 图像。

请记住Python多处理是如何工作的——Python将调用这个函数,然后创建一个全新的解释器来执行其中的代码。因此,每个生成的start_tracker进程都将独立于它的父进程。为了与Python驱动程序脚本通信,我们需要利用管道或队列(Pipes and Queues)。这两种类型的对象都是线程/进程安全的,使用锁和信号量来完成。

本质上,我们正在创建一个简单的生产者/消费者关系:

  • 我们的父进程将生成新帧并将它们添加到特定对象跟踪器的队列中。
  • 然后子进程将消耗帧,应用对象跟踪,然后返回更新的边界框坐标。

我决定在这篇文章中使用 Queue 对象;但是,请记住,如果您愿意,也可以使用Pipe

现在让我们开始一个无限循环,它将在进程中运行:

    # loop indefinitely -- this function will be called as a daemon
    # process so we don't need to worry about joining it
    while True:
        # attempt to grab the next frame from the input queue
        rgb = inputQueue.get()
        # if there was an entry in our queue, process it
        if rgb is not None:
            # update the tracker and grab the position of the tracked
            # object
            t.update(rgb)
            pos = t.get_position()
            # unpack the position object
            startX = int(pos.left())
            startY = int(pos.top())
            endX = int(pos.right())
            endY = int(pos.bottom())
            # add the label + bounding box coordinates to the output
            # queue
            outputQueue.put((label, (startX, startY, endX, endY)))

我们在这里无限循环——这个函数将作为守护进程调用,所以我们不需要担心加入它。

首先,我们将尝试从 inputQueue 中抓取一个新帧。如果帧不为空,我们将抓取帧,然后更新对象跟踪器,让我们获得更新后的边界框坐标。

最后,我们将标签和边界框写入 outputQueue,以便父进程可以在脚本的主循环中使用它们。

回到父进程,我们将解析命令行参数:

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
    help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
    help="path to Caffe pre-trained model")
ap.add_argument("-v", "--video", required=True,
    help="path to input video file")
ap.add_argument("-o", "--output", type=str,
    help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
    help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

此脚本的命令行参数与我们较慢的非多处理脚本完全相同。

让我们初始化我们的输入和输出队列:

# initialize our lists of queues -- both input queue and output queue
# for *every* object that we will be tracking
inputQueues = []
outputQueues = []

这些队列将保存我们正在跟踪的对象。生成的每个进程都需要两个 Queue 对象:

  • 一个从其中读取输入帧
  • 另一个将结果写入

下一个代码块与我们之前的脚本相同:

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
    "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
    "sofa", "train", "tvmonitor"]
# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])
# initialize the video stream and output video writer
print("[INFO] starting video stream...")
vs = cv2.VideoCapture(args["video"])
writer = None
# start the frames per second throughput estimator
fps = FPS().start()

我们定义模型的 CLASSES 并加载模型本身。

现在让我们开始循环视频流中的帧:

# loop over frames from the video file stream
while True:
    # grab the next frame from the video file
    (grabbed, frame) = vs.read()
    # check to see if we have reached the end of the video file
    if frame is None:
        break
    # resize the frame for faster processing and then convert the
    # frame from BGR to RGB ordering (dlib needs RGB ordering)
    frame = imutils.resize(frame, width=600)
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    # if we are supposed to be writing a video to disk, initialize
    # the writer
    if args["output"] is not None and writer is None:
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        writer = cv2.VideoWriter(args["output"], fourcc, 30,
            (frame.shape[1], frame.shape[0]), True)

现在让我们处理没有 inputQueues 的情况:

    # if our list of queues is empty then we know we have yet to
    # create our first object tracker
    if len(inputQueues) == 0:
        # grab the frame dimensions and convert the frame to a blob
        (h, w) = frame.shape[:2]
        blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)
        # pass the blob through the network and obtain the detections
        # and predictions
        net.setInput(blob)
        detections = net.forward()
        # loop over the detections
        for i in np.arange(0, detections.shape[2]):
            # extract the confidence (i.e., probability) associated
            # with the prediction
            confidence = detections[0, 0, i, 2]
            # filter out weak detections by requiring a minimum
            # confidence
            if confidence > args["confidence"]:
                # extract the index of the class label from the
                # detections list
                idx = int(detections[0, 0, i, 1])
                label = CLASSES[idx]
                # if the class label is not a person, ignore it
                if CLASSES[idx] != "person":
                    continue

如果没有 inputQueues,那么我们需要在对象跟踪之前应用对象检测。 我们应用对象检测,然后继续循环。我们获取置信度值并过滤掉弱检测。 如果我们的置信度满足我们的命令行参数建立的阈值,我们会考虑检测,但我们会通过类标签进一步过滤掉它。在这种情况下,我们只寻找person对象。 假设我们找到了一个person,我们将创建队列和生成跟踪进程:

                # compute the (x, y)-coordinates of the bounding box
                # for the object
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")
                bb = (startX, startY, endX, endY)
                # create two brand new input and output queues,
                # respectively
                iq = multiprocessing.Queue()
                oq = multiprocessing.Queue()
                inputQueues.append(iq)
                outputQueues.append(oq)
                # spawn a daemon process for a new object tracker
                p = multiprocessing.Process(
                    target=start_tracker,
                    args=(bb, label, rgb, iq, oq))
                p.daemon = True
                p.start()
                # grab the corresponding class label for the detection
                # and draw the bounding box
                cv2.rectangle(frame, (startX, startY), (endX, endY),
                    (0, 255, 0), 2)
                cv2.putText(frame, label, (startX, startY - 15),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

我们首先计算边界框坐标。从那里我们创建两个新队列 iq 和 oq,分别将它们附加到 inputQueues 和 outputQueues。我们生成一个新的 start_tracker 进程,传递边界框、标签、rgb 图像和 iq + oq。

我们还绘制了检测到的对象的边界框rectangle和类标签label。

否则,我们已经执行了对象检测,因此我们需要将每个 dlib 对象跟踪器应用于帧:

    # otherwise, we've already performed detection so let's track
    # multiple objects
    else:
        # loop over each of our input ques and add the input RGB
        # frame to it, enabling us to update each of the respective
        # object trackers running in separate processes
        for iq in inputQueues:
            iq.put(rgb)
        # loop over each of the output queues
        for oq in outputQueues:
            # grab the updated bounding box coordinates for the
            # object -- the .get method is a blocking operation so
            # this will pause our execution until the respective
            # process finishes the tracking update
            (label, (startX, startY, endX, endY)) = oq.get()
            # draw the bounding box from the correlation object
            # tracker
            cv2.rectangle(frame, (startX, startY), (endX, endY),
                (0, 255, 0), 2)
            cv2.putText(frame, label, (startX, startY - 15),
                cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

遍历每个 inputQueues ,我们将 rgb 图像添加到它们。然后我们遍历每个outputQueues,从每个独立的对象跟踪器获取边界框坐标。最后,我们绘制边界框+关联的类标签label。

    # check to see if we should write the frame to disk
    if writer is not None:
        writer.write(frame)
    # show the output frame
    cv2.imshow("Frame", frame)
    key = cv2.waitKey(1) & 0xFF
    # if the `q` key was pressed, break from the loop
    if key == ord("q"):
        break
    # update the FPS counter
    fps.update()
# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# check to see if we need to release the video writer pointer
if writer is not None:
    writer.release()
# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

如有必要,我们将帧写入输出视频,并将帧显示到屏幕。 如果按下q键,我们退出,跳出循环。 如果我们继续处理帧,我们的 FPS 计算器会更新,然后我们再次在 while 循环的开头开始处理。 否则,我们处理完帧,我们显示 FPS 信息 + 释放指针并关闭窗口。

打开终端并执行以下命令:

$ python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \
    --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \
    --video race.mp4 --output race_output_fast.avi
[INFO] loading model...
[INFO] starting video stream...
[INFO] elapsed time: 14.01
[INFO] approx. FPS: 24.26

如您所见,我们更快、更高效的多对象跟踪器以 24 FPS 运行,比我们之前的实现提高了 45% 以上?! 此外,如果您在此脚本运行时打开活动监视器,您将看到更多系统的CPU 正在被使用。 这种加速是通过允许每个 dlib 对象跟踪器在单独的进程中运行来获得的,这反过来又使您的操作系统能够执行更有效的 CPU 资源调度。

5.完整代码

multi_object_tracking_slow.py

# USAGE
# python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \
# 	--model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4

# import the necessary packages
from imutils.video import FPS
import numpy as np
import argparse
import imutils
import dlib
import cv2

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
	help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
	help="path to Caffe pre-trained model")
# ap.add_argument("-v", "--video", required=True,
# 	help="path to input video file")
ap.add_argument("-v", "--video",
	help="path to input video file")
ap.add_argument("-o", "--output", type=str,
	help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
	help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
	"bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
	"dog", "horse", "motorbike", "person", "pottedplant", "sheep",
	"sofa", "train", "tvmonitor"]

# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

# initialize the video stream and output video writer
print("[INFO] starting video stream...")
# vs = cv2.VideoCapture(args["video"])
vs = cv2.VideoCapture(0)
writer = None

# initialize the list of object trackers and corresponding class
# labels
trackers = []
labels = []

# start the frames per second throughput estimator
fps = FPS().start()

# loop over frames from the video file stream
while True:
	# grab the next frame from the video file
	(grabbed, frame) = vs.read()

	# check to see if we have reached the end of the video file
	if frame is None:
		break

	# resize the frame for faster processing and then convert the
	# frame from BGR to RGB ordering (dlib needs RGB ordering)
	frame = imutils.resize(frame, width=600)
	rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

	# if we are supposed to be writing a video to disk, initialize
	# the writer
	if args["output"] is not None and writer is None:
		fourcc = cv2.VideoWriter_fourcc(*"MJPG")
		writer = cv2.VideoWriter(args["output"], fourcc, 30,
			(frame.shape[1], frame.shape[0]), True)

	# if there are no object trackers we first need to detect objects
	# and then create a tracker for each object
	if len(trackers) == 0:
		# grab the frame dimensions and convert the frame to a blob
		(h, w) = frame.shape[:2]
		blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)

		# pass the blob through the network and obtain the detections
		# and predictions
		net.setInput(blob)
		detections = net.forward()

		# loop over the detections
		for i in np.arange(0, detections.shape[2]):
			# extract the confidence (i.e., probability) associated
			# with the prediction
			confidence = detections[0, 0, i, 2]

			# filter out weak detections by requiring a minimum
			# confidence
			if confidence > args["confidence"]:
				# extract the index of the class label from the
				# detections list
				idx = int(detections[0, 0, i, 1])
				label = CLASSES[idx]

				# if the class label is not a person, ignore it
				if CLASSES[idx] != "person":
					continue

				# compute the (x, y)-coordinates of the bounding box
				# for the object
				box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
				(startX, startY, endX, endY) = box.astype("int")

				# construct a dlib rectangle object from the bounding
				# box coordinates and start the correlation tracker
				t = dlib.correlation_tracker()
				rect = dlib.rectangle(startX, startY, endX, endY)
				t.start_track(rgb, rect)

				# update our set of trackers and corresponding class
				# labels
				labels.append(label)
				trackers.append(t)

				# grab the corresponding class label for the detection
				# and draw the bounding box
				cv2.rectangle(frame, (startX, startY), (endX, endY),
					(0, 255, 0), 2)
				cv2.putText(frame, label, (startX, startY - 15),
					cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# otherwise, we've already performed detection so let's track
	# multiple objects
	else:
		# loop over each of the trackers
		for (t, l) in zip(trackers, labels):
			# update the tracker and grab the position of the tracked
			# object
			t.update(rgb)
			pos = t.get_position()

			# unpack the position object
			startX = int(pos.left())
			startY = int(pos.top())
			endX = int(pos.right())
			endY = int(pos.bottom())

			# draw the bounding box from the correlation object tracker
			cv2.rectangle(frame, (startX, startY), (endX, endY),
				(0, 255, 0), 2)
			cv2.putText(frame, l, (startX, startY - 15),
				cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# check to see if we should write the frame to disk
	if writer is not None:
		writer.write(frame)

	# show the output frame
	cv2.imshow("Frame", frame)
	key = cv2.waitKey(1) & 0xFF

	# if the `q` key was pressed, break from the loop
	if key == ord("q"):
		break

	# update the FPS counter
	fps.update()

# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))

# check to see if we need to release the video writer pointer
if writer is not None:
	writer.release()

# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

multi_object_tracking_fast.py

# USAGE
# python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \
#	--model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4

# import the necessary packages
from imutils.video import FPS
import multiprocessing
import numpy as np
import argparse
import imutils
import dlib
import cv2

def start_tracker(box, label, rgb, inputQueue, outputQueue):
	# construct a dlib rectangle object from the bounding box
	# coordinates and then start the correlation tracker
	t = dlib.correlation_tracker()
	rect = dlib.rectangle(box[0], box[1], box[2], box[3])
	t.start_track(rgb, rect)

	# loop indefinitely -- this function will be called as a daemon
	# process so we don't need to worry about joining it
	while True:
		# attempt to grab the next frame from the input queue
		rgb = inputQueue.get()

		# if there was an entry in our queue, process it
		if rgb is not None:
			# update the tracker and grab the position of the tracked
			# object
			t.update(rgb)
			pos = t.get_position()

			# unpack the position object
			startX = int(pos.left())
			startY = int(pos.top())
			endX = int(pos.right())
			endY = int(pos.bottom())

			# add the label + bounding box coordinates to the output
			# queue
			outputQueue.put((label, (startX, startY, endX, endY)))

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
	help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
	help="path to Caffe pre-trained model")
ap.add_argument("-v", "--video", required=True,
	help="path to input video file")
ap.add_argument("-o", "--output", type=str,
	help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
	help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

# initialize our list of queues -- both input queue and output queue
# for *every* object that we will be tracking
inputQueues = []
outputQueues = []

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
	"bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
	"dog", "horse", "motorbike", "person", "pottedplant", "sheep",
	"sofa", "train", "tvmonitor"]

# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

# initialize the video stream and output video writer
print("[INFO] starting video stream...")
vs = cv2.VideoCapture(args["video"])
writer = None

# start the frames per second throughput estimator
fps = FPS().start()

# loop over frames from the video file stream
while True:
	# grab the next frame from the video file
	(grabbed, frame) = vs.read()

	# check to see if we have reached the end of the video file
	if frame is None:
		break

	# resize the frame for faster processing and then convert the
	# frame from BGR to RGB ordering (dlib needs RGB ordering)
	frame = imutils.resize(frame, width=600)
	rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

	# if we are supposed to be writing a video to disk, initialize
	# the writer
	if args["output"] is not None and writer is None:
		fourcc = cv2.VideoWriter_fourcc(*"MJPG")
		writer = cv2.VideoWriter(args["output"], fourcc, 30,
			(frame.shape[1], frame.shape[0]), True)

	# if our list of queues is empty then we know we have yet to
	# create our first object tracker
	if len(inputQueues) == 0:
		# grab the frame dimensions and convert the frame to a blob
		(h, w) = frame.shape[:2]
		blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)

		# pass the blob through the network and obtain the detections
		# and predictions
		net.setInput(blob)
		detections = net.forward()

		# loop over the detections
		for i in np.arange(0, detections.shape[2]):
			# extract the confidence (i.e., probability) associated
			# with the prediction
			confidence = detections[0, 0, i, 2]

			# filter out weak detections by requiring a minimum
			# confidence
			if confidence > args["confidence"]:
				# extract the index of the class label from the
				# detections list
				idx = int(detections[0, 0, i, 1])
				label = CLASSES[idx]

				# if the class label is not a person, ignore it
				if CLASSES[idx] != "person":
					continue

				# compute the (x, y)-coordinates of the bounding box
				# for the object
				box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
				(startX, startY, endX, endY) = box.astype("int")
				bb = (startX, startY, endX, endY)

				# create two brand new input and output queues,
				# respectively
				iq = multiprocessing.Queue()
				oq = multiprocessing.Queue()
				inputQueues.append(iq)
				outputQueues.append(oq)

				# spawn a daemon process for a new object tracker
				p = multiprocessing.Process(
					target=start_tracker,
					args=(bb, label, rgb, iq, oq))
				p.daemon = True
				p.start()

				# grab the corresponding class label for the detection
				# and draw the bounding box
				cv2.rectangle(frame, (startX, startY), (endX, endY),
					(0, 255, 0), 2)
				cv2.putText(frame, label, (startX, startY - 15),
					cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# otherwise, we've already performed detection so let's track
	# multiple objects
	else:
		# loop over each of our input ques and add the input RGB
		# frame to it, enabling us to update each of the respective
		# object trackers running in separate processes
		for iq in inputQueues:
			iq.put(rgb)

		# loop over each of the output queues
		for oq in outputQueues:
			# grab the updated bounding box coordinates for the
			# object -- the .get method is a blocking operation so
			# this will pause our execution until the respective
			# process finishes the tracking update
			(label, (startX, startY, endX, endY)) = oq.get()

			# draw the bounding box from the correlation object
			# tracker
			cv2.rectangle(frame, (startX, startY), (endX, endY),
				(0, 255, 0), 2)
			cv2.putText(frame, label, (startX, startY - 15),
				cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# check to see if we should write the frame to disk
	if writer is not None:
		writer.write(frame)

	# show the output frame
	cv2.imshow("Frame", frame)
	key = cv2.waitKey(1) & 0xFF

	# if the `q` key was pressed, break from the loop
	if key == ord("q"):
		break

	# update the FPS counter
	fps.update()

# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))

# check to see if we need to release the video writer pointer
if writer is not None:
	writer.release()

# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

链接:https://pan.baidu.com/s/1WhJr-Qxh5Wu3TsXKRiTHRg 提取码:1234

6.改进和建议

我今天与大家分享的 dlib 多对象跟踪 Python 脚本可以很好地处理较短的视频流;但是,如果您打算将此实现用于长时间运行的生产环境(大约数小时到数天的视频),我建议您进行两项主要改进:

第一个改进是利用进程池,而不是为每个要跟踪的对象生成一个全新的进程。今天在这里介绍的实现为我们需要跟踪的每个对象构建了一个全新的队列Queue和进程Process。

对于今天的目的来说这很好,但考虑一下如果您想跟踪视频中的 50 个对象——这意味着您将生成 50 个进程,每个对象一个。那时,系统管理所有这些进程的开销将破坏 FPS 的任何增加。相反,您可能希望利用进程池。

如果您的系统有 N 个处理器内核,那么您需要创建一个包含 N – 1 个进程的池,将一个内核留给您的操作系统来执行系统操作。这些进程中的每一个都应该执行多个对象跟踪,维护一个对象跟踪器列表,类似于我们今天介绍的第一个多对象跟踪。

这种改进将允许您利用处理器的所有内核,而无需产生许多独立进程的开销。

我要做的第二个改进是清理进程和队列。如果 dlib 将对象报告为“丢失”或“消失”,我们不会从 start_tracker 函数返回,这意味着该进程将在父脚本的生命周期内存活,并且仅在父脚本退出时被终止。

同样,这对于我们今天的目的来说很好,但是如果您打算在生产环境中使用此代码,您应该:

  • 更新 start_tracker 函数以在 dlib 报告对象丢失后返回。
  • 同时删除对应进程的 inputQueue 和 outputQueue。

未能执行此清理将导致长时间运行作业的不必要的计算消耗和内存开销。

第三个改进是通过每 N 帧运行一次对象检测器(而不是在开始时只运行一次)来提高跟踪精度。

实际上,我在使用 OpenCV 计数的文章中演示了这一点。它需要更多的逻辑和思考,但会产生更准确的跟踪器。 我选择放弃这个脚本的实现,这样我就可以简明地教你多处理方法。 理想情况下,除了多处理之外,您还可以使用第三个改进。

以上就是Python OpenCV使用dlib进行多目标跟踪详解的详细内容,更多关于OpenCV dlib多目标跟踪的资料请关注我们其它相关文章!

(0)

相关推荐

  • Android 中使用 dlib+opencv 实现动态人脸检测功能

    1 概述 完成 Android 相机预览功能以后,在此基础上我使用 dlib 与 opencv 库做了一个关于人脸检测的 demo.该 demo 在相机预览过程中对人脸进行实时检测,并将检测到的人脸用矩形框描绘出来.具体实现原理如下: 采用双层 View,底层的 TextureView 用于预览,程序从 TextureView 中获取预览帧数据,然后调用 dlib 库对帧数据进行处理,最后将检测结果绘制在顶层的 SurfaceView 中. 2 项目配置 由于项目中用到了 dlib 与 open

  • Dlib+OpenCV深度学习人脸识别的方法示例

    前言 人脸识别在LWF(Labeled Faces in the Wild)数据集上人脸识别率现在已经99.7%以上,这个识别率确实非常高了,但是真实的环境中的准确率有多少呢?我没有这方面的数据,但是可以确信的是真实环境中的识别率并没有那么乐观.现在虽然有一些商业应用如员工人脸识别管理系统.海关身份验证系统.甚至是银行人脸识别功能,但是我们可以仔细想想员工人脸识别管理,海关身份证系统的应用场景对身份的验证功能其实并没有商家吹嘘的那么重要,打个比方说员工上班的时候刷脸如果失败了会怎样,是不是重新识

  • 基于Python OpenCV和 dlib实现眨眼检测

    目录 了解"眼睛纵横比"(EAR) 使用面部标志和 OpenCV 检测眨眼 眨眼检测结果 总结 今天,我们使用面部标记和 OpenCV 检测视频流中的眨眼次数. 为了构建我们的眨眼检测器,我们将计算一个称为眼睛纵横比 (EAR) 的指标,该指标由 Soukupová 和 Čech 在他们 2016 年的论文<使用面部标记的实时眨眼检测>中介绍. 与计算眨眼的传统图像处理方法不同,传统的图像处理方法通常涉及以下某些组合: 眼睛定位. 阈值以找到眼白. 确定眼睛的"白

  • python实现单目标、多目标、多尺度、自定义特征的KCF跟踪算法(实例代码)

    单目标跟踪: 直接调用opencv中封装的tracker即可. #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Sun Jan 5 17:50:47 2020 第四章 kcf跟踪 @author: youxinlin """ import cv2 from items import MessageItem import time import numpy as np ''

  • 树莓派上利用python+opencv+dlib实现嘴唇检测的实现

    目录 1.安装相关库文件 2.代码部分 3.实验效果 树莓派上利用python+opencv+dlib实现嘴唇检测 项目的目标是在树莓派上运行python代码以实现嘴唇检测,本来以为树莓派的硬件是可以流畅运行实时检测的,但是实验的效果表明树莓派实时检测是不可行,后面还需要改进. 实验的效果如下: 1.安装相关库文件 这里需要用的库有opencv,numpy,dlib. 1.1 安装opencv pip3 install opencv-python 1.2 安装numpy 树莓派中自带了numpy

  • 超详细注释之OpenCV dlib实现人脸采集

    上一篇博客中,我们了解了什么是面部标志,以及如何使用dlib,OpenCV和Python检测它们.利用dlib的HOG SVM的形状预测器获得面部ROI中面部区域的68个点(x,y)坐标. 这一篇博客中,将演示如何使用NumPy数组切片魔术来分别访问每个面部部分并提取眼睛,眉毛,鼻子,嘴巴和下巴的特征. 1. 效果图 先上一张检测完的图: 也可以每一部分先标识出来: 2. 原理 面部标志主要是: 口 右眉 左眉 右眼 左眼 鼻子 下颚线 这一节即提取这些部分: 从图中可以看到假设是以0为下标的数

  • Python OpenCV使用dlib进行多目标跟踪详解

    目录 1.使用dlib进行多目标跟踪 2.项目结构 3.dlib多对象跟踪的简单“朴素”方法 4.快速.高效的dlib多对象跟踪实现 5.完整代码 6.改进和建议 在本教程中,您将学习如何使用 dlib 库在实时视频中有效地跟踪多个对象. 我们当然可以使用 dlib 跟踪多个对象:但是,为了获得可能的最佳性能,我们需要利用多处理并将对象跟踪器分布在处理器的多个内核上. 正确利用多处理使我们能够将 dlib 多对象跟踪每秒帧数 (FPS) 提高 45% 以上! 1.使用 dlib 进行多目标跟踪

  • Python+Opencv实战之人脸追踪详解

    目录 前言 人脸追踪技术简介 使用基于 dlib DCF 的跟踪器进行人脸跟踪 使用基于 dlib DCF 的跟踪器进行对象跟踪 小结 前言 人脸处理是人工智能中的一个热门话题,人脸处理可以使用计算机视觉算法从人脸中自动提取大量信息,例如身份.意图和情感:而目标跟踪试图估计目标在整个视频序列中的轨迹,其中只有目标的初始位置是已知的,将这两者进行结合将产生许多有趣的应用.由于外观变化.遮挡.快速运动.运动模糊和比例变化等多种因素,人脸追踪非常具有挑战性. 人脸追踪技术简介 基于判别相关滤波器 (d

  • Python OpenCV学习之图像滤波详解

    目录 背景 一.卷积相关概念 二.卷积实战 三.均值滤波 四.高斯滤波 五.中值滤波 六.双边滤波 七.Sobel算子 八.Scharr算子 九.拉普拉斯算子 十.Canny算法 背景 图像滤波的作用简单来说就是将一副图像通过滤波器得到另一幅图像:明确一个概念,滤波器又被称为卷积核,滤波的过程又被称为卷积:实际上深度学习就是训练许多适应任务的滤波器,本质上就是得到最佳的参数:当然在深度学习之前,也有一些常见的滤波器,本篇主要介绍这些常见的滤波器: 一.卷积相关概念 卷积核大小一般为奇数的原因:

  • Python OpenCV实现图像模板匹配详解

    目录 1.什么是模板匹配及模板匹配方法matchTemplate() 介绍 素材准备 2.单模板匹配 2.1 单目标匹配 2.2 多目标匹配 3.多模板匹配 1.什么是模板匹配及模板匹配方法matchTemplate() 介绍 提供一个模板图像,一个目标图像,且满足模板图像是目标图像的一部分,从目标图像中寻找特定的模板图像的过程,即为模板匹配.OpenCV提供了matchTemplate()方法帮助我们实现模板匹配. 该方法语法如下: cv2.matchTemplate(image, templ

  • Python OpenCV之常用滤波器使用详解

    目录 1. 滤波器 1.1 什么是滤波器 1.2 关于滤波核 1.3 素材选择 2.均值滤波器 cv2.blur() 2.1 语法简介 2.2 代码示例 3. 中值滤波器 cv2.medianBlur() 代码示例 4. 高斯滤波器 cv2.GaussianBlur() 5. 双边滤波器 cv2.bilateralFilter() 1. 滤波器 1.1 什么是滤波器 滤波器是对图像做平滑处理 的一种常用工具. 平滑处理即在尽可能地保留原图像信息的情况下,对像素值进行微调,使邻近的像素值之间,值的

  • Python OpenCV实现图形检测示例详解

    目录 1. 轮廓识别与描绘 1.1 cv2.findComtours()方法 1.2 cv2.drawContours() 方法 1.3 代码示例 2. 轮廓拟合 2.1 矩形包围框拟合 - cv2.boundingRect() 2.2圆形包围框拟合 - cv2.minEnclosingCircle() 3. 凸包 绘制 4. Canny边缘检测 - cv2.Canny() 4.1 cv2.Canny() 用法简介 4.2 代码示例 5. 霍夫变换 5.1 概述 5.2 cv2.HoughLin

  • Python+OpenCV实现图像识别替换功能详解

    OpenCV-Python是一个Python库,旨在解决计算机视觉问题. OpenCV是一个开源的计算机视觉库,1999年由英特尔的Gary Bradski启动.Bradski在访学过程中注意到,在很多优秀大学的实验室中,都有非常完备的内部公开的计算机视觉接口.这些接口从一届学生传到另一届学生,对于刚入门的新人来说,使用这些接口比重复造轮子方便多了.这些接口可以让他们在之前的基础上更有效地开展工作.OpenCV正是基于为计算机视觉提供通用接口这一目标而被策划的. 安装opencv pip3 in

  • Python+OpenCV读写视频的方法详解

    目录 读视频,提取帧 接口函数:cv2.VideoCapture() 获取视频信息 使用set(cv2.CAP_PROP_POS_FRAMES)读取指定帧 读取函数(重点) 将图像写为视频 示例 fourcc 读视频,提取帧 接口函数:cv2.VideoCapture() 通过video_capture = cv2.VideoCapture(video_path)可以获取读取视频的句柄.而后再通过flag, frame = video_capture.read()可以读取当前帧,flag表示读取

  • Python+Opencv实现图像模板匹配详解

    目录 引言 一.匹配方法 二.匹配单个对象 三.匹配多个对象 引言 什么是模板匹配呢? 看到这里大家是否会觉得很熟悉的感觉涌上心头!在人脸识别是不是也会看见 等等. 模板匹配可以看作是对象检测的一种非常基本的形式.使用模板匹配,我们可以使用包含要检测对象的“模板”来检测输入图像中的对象. 一.匹配方法 cv2.matchTemplate(img, templ, method) 参数:(img: 原始图像.temple: 模板图像.method: 匹配度计算方法) 方法如下: cv2.TM_SQD

  • python Opencv实现停车位识别思路详解

    目录 1.实现的思路 2.整体代码实战 3.停车位视频下载 1.实现的思路 (1)首先使用一个处理画框的程序,将图片中的有车和无车的停车位给画出来,并且保存坐标(如果画错了,将鼠标移至要删除的框中,右击鼠标,即可删除): #定义回调函数 def mouseClick(events,x,y,flags,params): #按下鼠标左键,将点击的坐标(x,y)保存到position列表中 if (events&cv2.EVENT_LBUTTONDOWN==cv2.EVENT_LBUTTONDOWN)

随机推荐