This article was published as a part of the Data Science Blogathon.
The artificial neural network has revolutionized the entire AI industry. Deep learning is a byproduct as well as an advanced technique inside Artificial Intelligence. Deep learning helps us to solve complex real-time and industry-relevant problems.
Today we will develop people counting and tracking system, where we will take a reference line on the frame and if a person is coming down the reference line, we will increment the down counter and if the person is going up the reference line we will increment the up counter.
Content
1. Methodology
2. Project Requirements
3. Tracking and Counting
4. Centroid Tracker
5. Pre-Requisite files
6. Final Project
Methodology for People Counting and Tracking Project
The objective is to build a system that has the following features.
- Read the frames from the Video.
- Draw a desired reference line on the input frame.
- Detect the people using the object detection model.
- Mark the centroid on the detected person.
- Track the movement of that marked centroid.
- Calculate the direction of centroid movement (whether it is moving upwards or downwards).
- Count the number of people coming in or going out of a reference line.
- Based on the counting, increment the up or down counter.
These are the basic steps we need to follow to build the deep learning project. Now let us move on to the requirements.
People Counting and Tracking Project Requirements
We can build the deep learning project locally since it does not have many dependencies.
Install the libraries:
- pip install numpy
- pip install opencv-python==3.4.2.16
Then we need a python file for Centroidtracker.
Here we will use YOLO v3 as our model for detecting the person in the frame. So we need to download YOLO v3 weights and YOLO v3 configuration files as well as the coco classes that is coco.names file. You can download the same from here.
Here is the test video.
Tracking and Counting
Object tracking techniques use methods like deep sort, centroid tracker, csrt, kcf, and camshift which track the detected object by comparing the similarity of detected objects with each other in each processed frame. If the object has the same similarity metric throughout the frame then it will track the same object throughout the sequence of frames and retain the same object ID for that object. This constant object ID for a particular object makes it easier for us to do the counting operations.
We can use any one of the above-mentioned methods for tracking. Usually, we use the deep sort method, which gives very good output compared to any other tracker, and also it gives better frames per second (FPS) compared to the centroid tracker, and the rest, but the major drawback of the deep sort method is that we need to finetune it for our custom object tracking application. Here we will use the centroid tracker in our project. Once we have a proper hold on the detected object throughout the frames, then we can perform the counting operation, object IN-OUT operation on that object.
Centroid Tracker
The centroid tracker has the following steps:
-
- Accepts the bouding box coordinates and computes the centroid.
- The algorithm accepts the bounding box coordinates that are xmin, ymin, xmax, and ymax and the gives (x_center, y_center) coordinates for each of the detected objects in each frame.
- The Centroid is calculated is given below:
X_cen = ((xmin + xmax) // 2
Y_cen = ((ymin + ymax) // 2)
where xmin, ymin, xmax, and ymax are the bounding box coordinates for object detection model (here YOLO v3).
-
- Then, it calculates the euclidian distance between the new detected bounding box and the existing object.
-
- Update the centroid for the existing object. After calculating the euclidian distance between the detected bounding box and the existing bounding box, it will update the position of the centroid in the frame, there by tracking the object.
-
- Registering new objects. When a new object enters or the same object is been detected, the centroid tracker will register the new object with a unique ID, so that it becomes helpful for different applications.
-
- De-registering the previous objects. Once the object is not in the frame, the algorithm will de-register the object ID, stating that the object is not available or left the frame.
This is the basic operation of centroid tracking.
Here for the object detection model, we will use YOLO v3 model, for detecting a class person in the frame. Download the YOLO weights and configuration (.cfg) file from the above section.
Pre-Requisite files for People Counting and Tracking
Create a folder named ‘utils’, and inside the ‘utils’ folder create a python file and name it as ‘centroidtracker (centroidtracker.py)’ and create another python file in the same ‘utils’ folder and name it as ‘object_trackable (object_trackable.py)’.
Open the centroidtracker.py file and copy the below content to it. We follow the steps included in the centroid tracker as discussed in the above section. Mainly we have three functions inside the class CentroidTracker, that is register function for registering the detected object, deregister function for removing the object from the registry, and the update function which updates the movement of the detected object throughout the frames.
########################### # centroidtracker.py ########################### # import the necessary packages from scipy.spatial import distance as dist from collections import OrderedDict import numpy as np
class CentroidTracker: def __init__(self, maxDisappeared=50, maxDistance=50): # initialize the next unique object ID along with two ordered # dictionaries used to keep track of mapping a given object # ID to its centroid and number of consecutive frames it has # been marked as "disappeared", respectively self.nextObjectID = 0 self.objects = OrderedDict() self.disappeared = OrderedDict() # store the number of maximum consecutive frames a given # object is allowed to be marked as "disappeared" until we # need to deregister the object from tracking self.maxDisappeared = maxDisappeared # store the maximum distance between centroids to associate # an object -- if the distance is larger than this maximum # distance we'll start to mark the object as "disappeared" self.maxDistance = maxDistance def register(self, centroid): # when registering an object we use the next available object # ID to store the centroid self.objects[self.nextObjectID] = centroid self.disappeared[self.nextObjectID] = 0 self.nextObjectID += 1 def deregister(self, objectID): # to deregister an object ID we delete the object ID from # both of our respective dictionaries del self.objects[objectID] del self.disappeared[objectID] def update(self, rects): # check to see if the list of input bounding box rectangles # is empty if len(rects) == 0: # loop over any existing tracked objects and mark them # as disappeared for objectID in list(self.disappeared.keys()): self.disappeared[objectID] += 1 # if we have reached a maximum number of consecutive # frames where a given object has been marked as # missing, deregister it if self.disappeared[objectID] > self.maxDisappeared: self.deregister(objectID) # return early as there are no centroids or tracking info # to update return self.objects # initialize an array of input centroids for the current frame inputCentroids = np.zeros((len(rects), 2), dtype="int") # loop over the bounding box rectangles for (i, (startX, startY, endX, endY)) in enumerate(rects): # use the bounding box coordinates to derive the centroid cX = int((startX + endX) / 2.0) cY = int((startY + endY) / 2.0) inputCentroids[i] = (cX, cY) # if we are currently not tracking any objects take the input # centroids and register each of them if len(self.objects) == 0: for i in range(0, len(inputCentroids)): self.register(inputCentroids[i]) # otherwise, are are currently tracking objects so we need to # try to match the input centroids to existing object # centroids else: # grab the set of object IDs and corresponding centroids objectIDs = list(self.objects.keys()) objectCentroids = list(self.objects.values()) # compute the distance between each pair of object # centroids and input centroids, respectively -- our # goal will be to match an input centroid to an existing # object centroid D = dist.cdist(np.array(objectCentroids), inputCentroids) # in order to perform this matching we must (1) find the # smallest value in each row and then (2) sort the row # indexes based on their minimum values so that the row # with the smallest value as at the *front* of the index # list rows = D.min(axis=1).argsort() # next, we perform a similar process on the columns by # finding the smallest value in each column and then # sorting using the previously computed row index list cols = D.argmin(axis=1)[rows] # in order to determine if we need to update, register, # or deregister an object we need to keep track of which # of the rows and column indexes we have already examined usedRows = set() usedCols = set() # loop over the combination of the (row, column) index # tuples for (row, col) in zip(rows, cols): # if we have already examined either the row or # column value before, ignore it if row in usedRows or col in usedCols: continue # if the distance between centroids is greater than # the maximum distance, do not associate the two # centroids to the same object if D[row, col] > self.maxDistance: continue # otherwise, grab the object ID for the current row, # set its new centroid, and reset the disappeared # counter objectID = objectIDs[row] self.objects[objectID] = inputCentroids[col] self.disappeared[objectID] = 0 # indicate that we have examined each of the row and # column indexes, respectively usedRows.add(row) usedCols.add(col) # compute both the row and column index we have NOT yet # examined unusedRows = set(range(0, D.shape[0])).difference(usedRows) unusedCols = set(range(0, D.shape[1])).difference(usedCols) # in the event that the number of object centroids is # equal or greater than the number of input centroids # we need to check and see if some of these objects have # potentially disappeared if D.shape[0] >= D.shape[1]: # loop over the unused row indexes for row in unusedRows: # grab the object ID for the corresponding row # index and increment the disappeared counter objectID = objectIDs[row] self.disappeared[objectID] += 1 # check to see if the number of consecutive # frames the object has been marked "disappeared" # for warrants deregistering the object if self.disappeared[objectID] > self.maxDisappeared: self.deregister(objectID) # otherwise, if the number of input centroids is greater # than the number of existing object centroids we need to # register each new input centroid as a trackable object else: for col in unusedCols: self.register(inputCentroids[col]) # return the set of trackable objects return self.objects
Now, open the object_trackable.py file and copy the below content. Here inside the class, we will store the object ID, then initialize a list of centroids using the current centroid. Then, we will initialize a boolean which is used to indicate if the object has already been counted or not.
########################### # object_trackable.py ########################## class TrackableObject: def __init__(self, objectID, centroid): # store the object ID, then initialize a list of centroids # using the current centroid self.objectID = objectID self.centroids = [centroid] # initialize a boolean used to indicate if the object has # already been counted or not self.counted = False
Then move the YOLO v3 weights and configuration file, coco.names file and the input test video to the same project directory. Once everything is ready the project directory will look something like this.
Final People Counting and Tracking Project
Create a python file and start writing the detection script for the deep learning project.
First, we will import the required libraries and we will import class CentroidTracker and TrackableObject from the utils folder.
import cv2 import argparse import sys import numpy as np import os.path import math from utils.centroidtracker import CentroidTracker from utils.object_trackable import TrackableObject
Then, Initialize certain parameters for detection and the width and height of input image.
# Initialize the parameters confThreshold = 0.6 # Confidence threshold nmsThreshold = 0.4 # Non-maximum suppression threshold inpWidth = 416 # Width of network's input image inpHeight = 416 # Height of network's input image
Now, we will create a Argument parser, so that we can pass the test video as our input argument to our script. Then we will read the coco.names file and store the classes in a variable. Give the configuration and weight files for the model and load the network using them.
parser = argparse.ArgumentParser(description='People Tracking and Counting Project') parser.add_argument('--video', help='Path to video file.') args = parser.parse_args()
classesFile = "coco.names" classes = None with open(classesFile, 'rt') as f: classes = f.read().rstrip('n').split('n')
modelConfiguration = "yolov3.cfg" modelWeights = "yolov3.weights"
Load the serialized model, initialize the video writer and then here we will initialize the frame dimensions (we’ll set them as soon as we read the first frame from the video).
print("[INFO] loading model...") net = cv2.dnn.readNetFromDarknet(modelConfiguration, modelWeights) net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV) net.setPreferableTarget(cv2.dnn.DNN_TARGET_OPENCL)
writer = None
W = None H = None
Now, instantiate the centroid tracker, then initialize a list to store each of the correlation trackers, and a dictionary to map the unique object ID of each detection to a TrackableObject. Then, initialize the number of frames processed, along with the total number of objects that have moved either up or down.
ct = CentroidTracker(maxDisappeared=40, maxDistance=50) trackers = [] trackableObjects = {}
totalDown = 0 totalUp = 0
We will write a small fucntion to get the names of the output layers.
def getOutputsNames(net): # Get the names of all the layers in the network layersNames = net.getLayerNames() # Get the names of the output layers, i.e. the layers with unconnected outputs return [layersNames[i[0] - 1] for i in net.getUnconnectedOutLayers()]
Now, let us write a preprocessing function that removes the bounding boxes with low confidence using non-max suppression. Inside this function, we will scan through all the bounding boxes output from the network and keep only the ones with high confidence scores. Then, perform non-max suppression to eliminate redundant bounding boxes with lower confidences. Finally, once we get the desired class (person) we will use the centroid tracker to associate the old object centroids with the newly computed object centroids.
def postprocess(frame, outs): frameHeight = frame.shape[0] frameWidth = frame.shape[1] rects = [] classIds = [] confidences = [] boxes = [] for out in outs: for detection in out: scores = detection[5:] classId = np.argmax(scores) confidence = scores[classId] if confidence > confThreshold: center_x = int(detection[0] * frameWidth) center_y = int(detection[1] * frameHeight) width = int(detection[2] * frameWidth) height = int(detection[3] * frameHeight) left = int(center_x - width / 2) top = int(center_y - height / 2) classIds.append(classId) confidences.append(float(confidence)) boxes.append([left, top, width, height])
indices = cv2.dnn.NMSBoxes(boxes, confidences, confThreshold, nmsThreshold) for i in indices: i = i[0] box = boxes[i] left = box[0] top = box[1] width = box[2] height = box[3] # Class "person" if classIds[i] == 0: rects.append((left, top, left + width, top + height)) objects = ct.update(rects) counting(objects)
Write a count function that loops over the tracked objects in the frame and checks if the object exists for a particular object ID, if there is no existing object, it will create one. If there is a trackable object we need to find the direction of the movement of the centroid, in order to assign whether it is going up or down. Check if the object is counted or not and store the trackable object in a dictionary and put the necessary details on the frame.
def counting(objects): frameHeight = frame.shape[0] frameWidth = frame.shape[1] global totalDown global totalUp
# loop over the tracked objects for (objectID, centroid) in objects.items(): to = trackableObjects.get(objectID, None) # if there is no existing trackable object, create one if to is None: to = TrackableObject(objectID, centroid) else: y = [c[1] for c in to.centroids] direction = centroid[1] - np.mean(y) print(direction) to.centroids.append(centroid) # check to see if the object has been counted or not if not to.counted: if direction < 0 and centroid[1] in range(frameHeight//2 - 30, frameHeight//2 + 30): totalUp += 1 to.counted = True elif direction > 0 and centroid[1] in range(frameHeight//2 - 30, frameHeight//2 + 30): totalDown += 1 to.counted = True # store the trackable object in our dictionary trackableObjects[objectID] = to cv2.circle(frame, (centroid[0], centroid[1]), 4, (0, 255, 0), -1) info = [ ("Up", totalUp), ("Down", totalDown), ] # loop over the info tuples and draw them on our frame for (i, (k, v)) in enumerate(info): text = "{}: {}".format(k, v) cv2.putText(frame, text, (10, frameHeight - ((i * 20) + 20)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
Now, we will take the input. It can be a video or a real-time input from the webcam and also we will use cv2.VideoWriter to save the output video.
# Process inputs winName = 'People Counting and Tracking System' cv2.namedWindow(winName, cv2.WINDOW_NORMAL) outputFile = "yolo_out_py.avi" if (args.video): # Open the video file if not os.path.isfile(args.video): print("Input video file ", args.video, " doesn't exist") sys.exit(1) cap = cv2.VideoCapture(args.video) outputFile = args.video[:-4]+'_output.avi' else: # Webcam input cap = cv2.VideoCapture(0) # Get the video writer initialized to save the output video vid_writer = cv2.VideoWriter(outputFile, cv2.VideoWriter_fourcc('M','J','P','G'), 30, (round(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),round(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))
The main part, inside the while loop we will integrate everything, first, we will read the video feed and then based on the video we will draw a reference line on the frame so that we can keep the up and down count with respect to that line. We need to break the execution if it reaches the end of the video. Inside the loop create a blob from the frame, set the input, and use preprocess function to remove bounding boxes with very low confidence and then write the frame one by one and then display.
while cv2.waitKey(1) < 0: # get frame from the video hasFrame, frame = cap.read() frameHeight = frame.shape[0] frameWidth = frame.shape[1] cv2.line(frame, (0, frameHeight // 2), (frameWidth, frameHeight // 2), (0, 255, 255), 2) # Stop the program if reached end of video if not hasFrame: print("Done processing !!!") print("Output file is stored as ", outputFile) cv2.waitKey(3000) # Release device cap.release() break # Create a 4D blob from a frame. blob = cv2.dnn.blobFromImage(frame, 1/255, (inpWidth, inpHeight), [0,0,0], 1, crop=False) # Sets the input to the network net.setInput(blob) # Runs the forward pass to get output of the output layers outs = net.forward(getOutputsNames(net)) # Remove the bounding boxes with low confidence postprocess(frame, outs) # Put efficiency information. The function getPerfProfile returns the overall time for inference(t) and the timings for each of the layers(in layersTimes) t, _ = net.getPerfProfile() label = 'Inference time: %.2f ms' % (t * 1000.0 / cv2.getTickFrequency()) cv2.putText(frame, label, (0, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255)) # Write the frame with the detection boxes vid_writer.write(frame.astype(np.uint8)) cv2.imshow(winName, frame)
After all these steps the project directory looks like this.
The entire code snippet for detection is given below.
import cv2 import argparse import sys import numpy as np import os.path import math from utils.centroidtracker import CentroidTracker from utils.object_trackable import TrackableObject
# Initialize the parameters confThreshold = 0.6 #Confidence threshold nmsThreshold = 0.4 #Non-maximum suppression threshold inpWidth = 416 #Width of network's input image inpHeight = 416 #Height of network's input image parser = argparse.ArgumentParser(description='Object Detection using YOLO in OPENCV') parser.add_argument('--video', help='Path to video file.') args = parser.parse_args() # Load names of classes classesFile = "coco.names"; classes = None with open(classesFile, 'rt') as f: classes = f.read().rstrip('n').split('n') # Give the configuration and weight files for the model and load the network using them. modelConfiguration = "yolov3.cfg"; modelWeights = "yolov3.weights"; # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromDarknet(modelConfiguration, modelWeights) net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV) net.setPreferableTarget(cv2.dnn.DNN_TARGET_OPENCL) # initialize the video writer writer = None # initialize the frame dimensions (we'll set them as soon as we read # the first frame from the video) W = None H = None # instantiate our centroid tracker, then initialize a list to store # each of our dlib correlation trackers, followed by a dictionary to # map each unique object ID to a TrackableObject ct = CentroidTracker(maxDisappeared=40, maxDistance=50) trackers = [] trackableObjects = {} # initialize the total number of frames processed thus far, along # with the total number of objects that have moved either up or down totalDown = 0 totalUp = 0 # Get the names of the output layers def getOutputsNames(net): # Get the names of all the layers in the network layersNames = net.getLayerNames() # Get the names of the output layers, i.e. the layers with unconnected outputs return [layersNames[i[0] - 1] for i in net.getUnconnectedOutLayers()] # Remove the bounding boxes with low confidence using non-maxima suppression def postprocess(frame, outs): frameHeight = frame.shape[0] frameWidth = frame.shape[1] rects = [] # Scan through all the bounding boxes output from the network and keep only the # ones with high confidence scores. Assign the box's class label as the class with the highest score. classIds = [] confidences = [] boxes = [] for out in outs: for detection in out: scores = detection[5:] classId = np.argmax(scores) confidence = scores[classId] if confidence > confThreshold: center_x = int(detection[0] * frameWidth) center_y = int(detection[1] * frameHeight) width = int(detection[2] * frameWidth) height = int(detection[3] * frameHeight) left = int(center_x - width / 2) top = int(center_y - height / 2) classIds.append(classId) confidences.append(float(confidence)) boxes.append([left, top, width, height]) # Perform non maximum suppression to eliminate redundant overlapping boxes with # lower confidences. indices = cv2.dnn.NMSBoxes(boxes, confidences, confThreshold, nmsThreshold) for i in indices: i = i[0] box = boxes[i] left = box[0] top = box[1] width = box[2] height = box[3] # Class "person" if classIds[i] == 0: rects.append((left, top, left + width, top + height)) # use the centroid tracker to associate the (1) old object # centroids with (2) the newly computed object centroids objects = ct.update(rects) counting(objects) def counting(objects): frameHeight = frame.shape[0] frameWidth = frame.shape[1] global totalDown global totalUp # loop over the tracked objects for (objectID, centroid) in objects.items(): # check to see if a trackable object exists for the current # object ID to = trackableObjects.get(objectID, None) # if there is no existing trackable object, create one if to is None: to = TrackableObject(objectID, centroid) # otherwise, there is a trackable object so we can utilize it # to determine direction else: # the difference between the y-coordinate of the *current* # centroid and the mean of *previous* centroids will tell # us in which direction the object is moving (negative for # 'up' and positive for 'down') y = [c[1] for c in to.centroids] direction = centroid[1] - np.mean(y) print(direction) to.centroids.append(centroid) # check to see if the object has been counted or not if not to.counted: # if the direction is negative (indicating the object # is moving up) AND the centroid is above the center # line, count the object if direction < 0 and centroid[1] in range(frameHeight//2 - 30, frameHeight//2 + 30): totalUp += 1 to.counted = True # if the direction is positive (indicating the object # is moving down) AND the centroid is below the # center line, count the object elif direction > 0 and centroid[1] in range(frameHeight//2 - 30, frameHeight//2 + 30): totalDown += 1 to.counted = True # store the trackable object in our dictionary trackableObjects[objectID] = to # draw both the ID of the object and the centroid of the # object on the output frame #text = "ID {}".format(objectID) #cv.putText(frame, text, (centroid[0] - 10, centroid[1] - 10), #cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) cv2.circle(frame, (centroid[0], centroid[1]), 4, (0, 255, 0), -1) # construct a tuple of information we will be displaying on the # frame info = [ ("Up", totalUp), ("Down", totalDown), ] # loop over the info tuples and draw them on our frame for (i, (k, v)) in enumerate(info): text = "{}: {}".format(k, v) cv2.putText(frame, text, (10, frameHeight - ((i * 20) + 20)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) # Process inputs winName = 'People Counting and Tracking System' cv2.namedWindow(winName, cv2.WINDOW_NORMAL) outputFile = "yolo_out_py.avi" if (args.video): # Open the video file if not os.path.isfile(args.video): print("Input video file ", args.video, " doesn't exist") sys.exit(1) cap = cv2.VideoCapture(args.video) outputFile = args.video[:-4]+'_output.avi' else: # Webcam input cap = cv2.VideoCapture(0) # Get the video writer initialized to save the output video vid_writer = cv2.VideoWriter(outputFile, cv2.VideoWriter_fourcc('M','J','P','G'), 30, (round(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),round(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))) while cv2.waitKey(1) < 0: # get frame from the video hasFrame, frame = cap.read() frameHeight = frame.shape[0] frameWidth = frame.shape[1] cv2.line(frame, (0, frameHeight // 2), (frameWidth, frameHeight // 2), (0, 255, 255), 2) # Stop the program if reached end of video if not hasFrame: print("Done processing !!!") print("Output file is stored as ", outputFile) cv2.waitKey(3000) # Release device cap.release() break # Create a 4D blob from a frame. blob = cv2.dnn.blobFromImage(frame, 1/255, (inpWidth, inpHeight), [0,0,0], 1, crop=False) # Sets the input to the network net.setInput(blob) # Runs the forward pass to get output of the output layers outs = net.forward(getOutputsNames(net)) # Remove the bounding boxes with low confidence postprocess(frame, outs) # Put efficiency information. The function getPerfProfile returns the overall time for inference(t) and the timings for each of the layers(in layersTimes) t, _ = net.getPerfProfile() label = 'Inference time: %.2f ms' % (t * 1000.0 / cv2.getTickFrequency()) cv2.putText(frame, label, (0, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255)) # Write the frame with the detection boxes vid_writer.write(frame.astype(np.uint8)) cv2.imshow(winName, frame)
Now, let us run our file. Just open the terminal and type the below command.
python counting_people.py --video D:/pycharmprojects/Counting-People/test.mp4
Above is the output frame. Once it runs on the complete video, the output video will be saved in the same project directory.
The entire project is available here.
Hope you enjoyed it!!
Conclusion
This deep learning project can be implemented in various applications as well as for various objects. The detection speed might be slow, but this is the basics of object counting and tracking in a frame. Furthermore, you can implement similar ideas using different trackers like Deep sort, csrt, etc. and different object detection models to check its inference speed and accuracy.
Note: All the images are created by the author.
My LinkedIn
Thank You.
The media shown in this article is not owned by Analytics Vidhya and are used at the Author’s discretion