Today, we will explore together how to implement road traffic counting based on computer vision.
In this tutorial, we will perform motion detection very simply using only Python and OpenCV with the help of a background subtraction algorithm.
We will cover the following four areas:
1. Main idea of background subtraction algorithm for object detection.
2. OpenCV image filters.
3. Detection of objects using contours.
4. Establishment of a structure for further data processing.
Background Deduction Algorithm
There are many different background deduction algorithms, but their main idea is simple.
Suppose there is a video of a room with no people or pets on certain frames, then the video is basically static at this point, which we will call the background (background_layer). So to get the objects that are moving on the video, all we need to do is: subtract the background from the current frame.
We will not be able to get static frames due to lighting variations, human moving objects, or the constant presence of moving people and pets. In this case, we select a number of image frames from the video, and if the vast majority of image frames have some identical pixel point, then this uses the pixel as part of the background_layer.
We will use the MOG algorithm for background deduction
raw frame
The code is shown below:
import os import logging import import random import numpy as np import import cv2 import as plt import utils # without this some strange errors happen (False) (123) # ============================================================================ IMAGE_DIR = "./out" VIDEO_SOURCE = "input.mp4" SHAPE = (720, 1280) # HxW # ============================================================================ def train_bg_subtractor(inst, cap, num=500): ''' BG substractor need process some amount of frames to start giving result ''' print ('Training BG Subtractor...') i = 0 for frame in cap: (frame, None, 0.001) i += 1 if i >= num: return cap def main(): log = ("main") # creting MOG bg subtractor with 500 frames in cache # and shadow detction bg_subtractor = cv2.createBackgroundSubtractorMOG2( history=500, detectShadows=True) # Set up image source # You can use also CV2, for some reason it not working for me cap = (VIDEO_SOURCE) # skipping 500 frames to train bg subtractor train_bg_subtractor(bg_subtractor, cap, num=500) frame_number = -1 for frame in cap: if not (): ("Frame capture failed, stopping...") break frame_number += 1 utils.save_frame(frame, "./out/frame_%" % frame_number) fg_mask = bg_subtractor.apply(frame, None, 0.001) utils.save_frame(frame, "./out/fg_mask_%" % frame_number) # ============================================================================ if __name__ == "__main__": log = utils.init_logging() if not (IMAGE_DIR): ("Creating image directory `%s`...", IMAGE_DIR) (IMAGE_DIR) main()
The following foreground image is obtained after processing
Foreground image after removing background
We can see that there is some noise on the foreground image, which can be removed by standard filtering techniques.
filtering radio waves (i.e. pick out one frequency)
For our current situation, we will need the following filter functions: Threshold, Erode, Dilate, Opening, Closing.
First, we use "Closing" to remove gaps in the region, then "Opening" to remove individual independent pixel points, and then "Dilate to remove individual pixels, and then "Dilate" to expand the object to make it thicker. The code is as follows:
def filter_mask(img): kernel = (cv2.MORPH_ELLIPSE, (2, 2)) # Fill any small holes closing = (img, cv2.MORPH_CLOSE, kernel) # Remove noise opening = (closing, cv2.MORPH_OPEN, kernel) # Dilate to merge adjacent blobs dilation = (opening, kernel, iterations=2) # threshold th = dilation[dilation < 240] = 0 return th
The treated outlook is as follows:
Object detection using contours
We will use the function to detect the contour. The parameters we can choose while using it are:
cv2.CV_RETR_EXTERNAL------ gets only the external contour.
cv2.CV_CHAIN_APPROX_TC89_L1------ using Teh-Chin chain approximation algorithm (faster)
The code is as follows:
def get_centroid(x, y, w, h): x1 = int(w / 2) y1 = int(h / 2) cx = x + x1 cy = y + y1 return (cx, cy) def detect_vehicles(fg_mask, min_contour_width=35, min_contour_height=35): matches = [] # finding external contours im, contours, hierarchy = ( fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1) # filtering by with, height for (i, contour) in enumerate(contours): (x, y, w, h) = (contour) contour_valid = (w >= min_contour_width) and ( h >= min_contour_height) if not contour_valid: continue # getting center of the bounding box centroid = get_centroid(x, y, w, h) (((x, y, w, h), centroid)) return matches
Establishment of a data-processing framework
We all know that there is no single algorithm that can handle all problems in ML and CV. Even if such an algorithm existed, we wouldn't use it because it's hardly effective at scale. For example, a few years ago Netflix offered a $3 million prize for the best movie recommendation algorithm. There was a team that accomplished this, but their recommendation algorithm couldn't work at scale, so it was actually useless to the company. However, Netflix still rewarded them with $1 million dollars.
Next we build the framework for solving the problem at hand, which makes it easier to work with the data
class PipelineRunner(object): ''' Very simple pipline. Just run passed processors in order with passing context from one to another. You can also set log level for processors. ''' def __init__(self, pipeline=None, log_level=): = pipeline or [] = {} = (self.__class__.__name__) (log_level) self.log_level = log_level self.set_log_level() def set_context(self, data): = data def add(self, processor): if not isinstance(processor, PipelineProcessor): raise Exception( 'Processor should be an isinstance of PipelineProcessor.') (self.log_level) (processor) def remove(self, name): for i, p in enumerate(): if p.__class__.__name__ == name: del [i] return True return False def set_log_level(self): for p in : (self.log_level) def run(self): for p in : = p() ("Frame #%d processed.", ['frame_number']) return class PipelineProcessor(object): ''' Base class for processors. ''' def __init__(self): = (self.__class__.__name__)
First we get a list of processors in running order and let each processor do a part of the work and complete the execution in case order to get the final result.
We start by creating the contour detection processor. The contour detection processor simply combines the previous background deduction, filtering and contour detection sections together, as shown in the code below:
class ContourDetection(PipelineProcessor): ''' Detecting moving objects. Purpose of this processor is to subtrac background, get moving objects and detect them with a method, and then filter off-by width and height. bg_subtractor - background subtractor isinstance. min_contour_width - min bounding rectangle width. min_contour_height - min bounding rectangle height. save_image - if True will save detected objects mask to file. image_dir - where to save images(must exist). ''' def __init__(self, bg_subtractor, min_contour_width=35, min_contour_height=35, save_image=False, image_dir='images'): super(ContourDetection, self).__init__() self.bg_subtractor = bg_subtractor self.min_contour_width = min_contour_width self.min_contour_height = min_contour_height self.save_image = save_image self.image_dir = image_dir def filter_mask(self, img, a=None): ''' This filters are hand-picked just based on visual tests ''' kernel = (cv2.MORPH_ELLIPSE, (2, 2)) # Fill any small holes closing = (img, cv2.MORPH_CLOSE, kernel) # Remove noise opening = (closing, cv2.MORPH_OPEN, kernel) # Dilate to merge adjacent blobs dilation = (opening, kernel, iterations=2) return dilation def detect_vehicles(self, fg_mask, context): matches = [] # finding external contours im2, contours, hierarchy = ( fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1) for (i, contour) in enumerate(contours): (x, y, w, h) = (contour) contour_valid = (w >= self.min_contour_width) and ( h >= self.min_contour_height) if not contour_valid: continue centroid = utils.get_centroid(x, y, w, h) (((x, y, w, h), centroid)) return matches def __call__(self, context): frame = context['frame'].copy() frame_number = context['frame_number'] fg_mask = self.bg_subtractor.apply(frame, None, 0.001) # just thresholding values fg_mask[fg_mask < 240] = 0 fg_mask = self.filter_mask(fg_mask, frame_number) if self.save_image: utils.save_frame(fg_mask, self.image_dir + "/mask_%" % frame_number, flip=False) context['objects'] = self.detect_vehicles(fg_mask, context) context['fg_mask'] = fg_mask return contex
Now, let's create a processor that will find out the same objects detected on different frames, create paths, and count the vehicles that reach the exit area. The code is shown below:
''' Counting vehicles that entered in exit zone. Purpose of this class based on detected object and local cache create objects pathes and count that entered in exit zone defined by exit masks. exit_masks - list of the exit masks. path_size - max number of points in a path. max_dst - max distance between two points. ''' def __init__(self, exit_masks=[], path_size=10, max_dst=30, x_weight=1.0, y_weight=1.0): super(VehicleCounter, self).__init__() self.exit_masks = exit_masks self.vehicle_count = 0 self.path_size = path_size = [] self.max_dst = max_dst self.x_weight = x_weight self.y_weight = y_weight def check_exit(self, point): for exit_mask in self.exit_masks: try: if exit_mask[point[1]][point[0]] == 255: return True except: return True return False def __call__(self, context): objects = context['objects'] context['exit_masks'] = self.exit_masks context['pathes'] = context['vehicle_count'] = self.vehicle_count if not objects: return context points = (objects)[:, 0:2] points = () # add new points if pathes is empty if not : for match in points: ([match]) else: # link new points with old pathes based on minimum distance between # points new_pathes = [] for path in : _min = 999999 _match = None for p in points: if len(path) == 1: # distance from last point to current d = (p[0], path[-1][0]) else: # based on 2 prev points predict next point and calculate # distance from predicted next point to current xn = 2 * path[-1][0][0] - path[-2][0][0] yn = 2 * path[-1][0][1] - path[-2][0][1] d = ( p[0], (xn, yn), x_weight=self.x_weight, y_weight=self.y_weight ) if d < _min: _min = d _match = p if _match and _min <= self.max_dst: (_match) (_match) new_pathes.append(path) # do not drop path if current frame has no matches if _match is None: new_pathes.append(path) = new_pathes # add new pathes if len(points): for p in points: # do not add points that already should be counted if self.check_exit(p[1]): continue ([p]) # save only last N points in path for i, _ in enumerate(): [i] = [i][self.path_size * -1:] # count vehicles and drop counted pathes: new_pathes = [] for i, path in enumerate(): d = path[-2:] if ( # need at list two points to count len(d) >= 2 and # prev point not in exit zone not self.check_exit(d[0][1]) and # current point in exit zone self.check_exit(d[1][1]) and # path len is bigger then min self.path_size <= len(path) ): self.vehicle_count += 1 else: # prevent linking with path that already in exit zone add = True for p in path: if self.check_exit(p[1]): add = False break if add: new_pathes.append(path) = new_pathes context['pathes'] = context['objects'] = objects context['vehicle_count'] = self.vehicle_count ('#VEHICLES FOUND: %s' % self.vehicle_count) return context
The code above is a bit complex, so let's go over it part by part.
The green part in the image above is the exit area. We count the vehicles here, and we only count them if they move more than 3 points in length
We use a mask to solve this problem because it is much more efficient and simpler than using a vector algorithm. Simply use a "binary sum" to select the midpoint of the vehicle area. The setup is as follows:
EXIT_PTS = ([ [[732, 720], [732, 590], [1280, 500], [1280, 720]], [[0, 400], [645, 400], [645, 0], [0, 0]] ]) base = (SHAPE + (3,), dtype='uint8') exit_mask = (base, EXIT_PTS, (255, 255, 255))[:, :, 0]
Now we link the detected points.
For the first frame of the image, we add all points as new paths.
Next, if len(path) == 1, we find the object closest to the last point of each path among the newly detected objects.
If len(path) > 1, the last two points in the path are used, i.e. the new point is predicted on the same line and the minimum distance between that point and the current point is found.
Points with minimum distance will be added to the end of the current path and removed from the list. If there are points left after this, we will add them as new paths. This process also limits the number of points in the path.
new_pathes = [] for path in : _min = 999999 _match = None for p in points: if len(path) == 1: # distance from last point to current d = (p[0], path[-1][0]) else: # based on 2 prev points predict next point and calculate # distance from predicted next point to current xn = 2 * path[-1][0][0] - path[-2][0][0] yn = 2 * path[-1][0][1] - path[-2][0][1] d = ( p[0], (xn, yn), x_weight=self.x_weight, y_weight=self.y_weight ) if d < _min: _min = d _match = p if _match and _min <= self.max_dst: (_match) (_match) new_pathes.append(path) # do not drop path if current frame has no matches if _match is None: new_pathes.append(path) = new_pathes # add new pathes if len(points): for p in points: # do not add points that already should be counted if self.check_exit(p[1]): continue ([p]) # save only last N points in path for i, _ in enumerate(): [i] = [i][self.path_size * -1:]
Now, we will try to count the vehicles entering the exit area. To do this, we need to get the last 2 points in the path and check if len(path) should be greater than the limit.
# count vehicles and drop counted pathes: new_pathes = [] for i, path in enumerate(): d = path[-2:] if ( # need at list two points to count len(d) >= 2 and # prev point not in exit zone not self.check_exit(d[0][1]) and # current point in exit zone self.check_exit(d[1][1]) and # path len is bigger then min self.path_size <= len(path) ): self.vehicle_count += 1 else: # prevent linking with path that already in exit zone add = True for p in path: if self.check_exit(p[1]): add = False break if add: new_pathes.append(path) = new_pathes context['pathes'] = context['objects'] = objects context['vehicle_count'] = self.vehicle_count ('#VEHICLES FOUND: %s' % self.vehicle_count) return context
The last two processors are the CSV writer for creating report CSV files, and for debugging and beautiful image visualization.
class CsvWriter(PipelineProcessor): def __init__(self, path, name, start_time=0, fps=15): super(CsvWriter, self).__init__() = open((path, name), 'w') = (, fieldnames=['time', 'vehicles']) () self.start_time = start_time = fps = path = name = None def __call__(self, context): frame_number = context['frame_number'] count = _count = context['vehicle_count'] if : _count = count - time = ((self.start_time + int(frame_number / )) * 100 + int(100.0 / ) * (frame_number % )) ({'time': time, 'vehicles': _count}) = count return context class Visualizer(PipelineProcessor): def __init__(self, save_image=True, image_dir='images'): super(Visualizer, self).__init__() self.save_image = save_image self.image_dir = image_dir def check_exit(self, point, exit_masks=[]): for exit_mask in exit_masks: if exit_mask[point[1]][point[0]] == 255: return True return False def draw_pathes(self, img, pathes): if not (): return for i, path in enumerate(pathes): path = (path)[:, 1].tolist() for point in path: (img, point, 2, CAR_COLOURS[0], -1) (img, [np.int32(path)], False, CAR_COLOURS[0], 1) return img def draw_boxes(self, img, pathes, exit_masks=[]): for (i, match) in enumerate(pathes): contour, centroid = match[-1][:2] if self.check_exit(centroid, exit_masks): continue x, y, w, h = contour (img, (x, y), (x + w - 1, y + h - 1), BOUNDING_BOX_COLOUR, 1) (img, centroid, 2, CENTROID_COLOUR, -1) return img def draw_ui(self, img, vehicle_count, exit_masks=[]): # this just add green mask with opacity to the image for exit_mask in exit_masks: _img = (, ) _img[:, :] = EXIT_COLOR mask = cv2.bitwise_and(_img, _img, mask=exit_mask) (mask, 1, img, 1, 0, img) # drawing top block with counts (img, (0, 0), ([1], 50), (0, 0, 0), ) (img, ("Vehicles passed: {total} ".format(total=vehicle_count)), (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1) return img def __call__(self, context): frame = context['frame'].copy() frame_number = context['frame_number'] pathes = context['pathes'] exit_masks = context['exit_masks'] vehicle_count = context['vehicle_count'] frame = self.draw_ui(frame, vehicle_count, exit_masks) frame = self.draw_pathes(frame, pathes) frame = self.draw_boxes(frame, pathes, exit_masks) utils.save_frame(frame, self.image_dir + "/processed_%" % frame_number) return context
reach a verdict
As we can see, it's not as hard as many people think. However, if the partner runs the script, the partner will find that this solution is not ideal, there are problems with overlapping foreground objects, and it does not categorize vehicles by type. However, the algorithm has good accuracy when the camera has a good location, for example, directly above the road.
To this article on the use of OpenCV to achieve the use of road vehicle counting methodology is introduced to this article, more related to OpenCV road vehicle counting content, please search for my previous articles or continue to browse the following related articles I hope that you will support me more in the future!