SoFunction
Updated on 2024-11-17

Implementation of Road Vehicle Counting Using OpenCV Usage

Today, we will explore together how to implement road traffic counting based on computer vision.


In this tutorial, we will perform motion detection very simply using only Python and OpenCV with the help of a background subtraction algorithm.

We will cover the following four areas:

1. Main idea of background subtraction algorithm for object detection.

2. OpenCV image filters.

3. Detection of objects using contours.

4. Establishment of a structure for further data processing.

Background Deduction Algorithm

There are many different background deduction algorithms, but their main idea is simple.

Suppose there is a video of a room with no people or pets on certain frames, then the video is basically static at this point, which we will call the background (background_layer). So to get the objects that are moving on the video, all we need to do is: subtract the background from the current frame.

We will not be able to get static frames due to lighting variations, human moving objects, or the constant presence of moving people and pets. In this case, we select a number of image frames from the video, and if the vast majority of image frames have some identical pixel point, then this uses the pixel as part of the background_layer.

We will use the MOG algorithm for background deduction

raw frame

The code is shown below:

import os
import logging
import 
import random

import numpy as np
import 
import cv2
import  as plt

import utils
# without this some strange errors happen
(False)
(123)

# ============================================================================
IMAGE_DIR = "./out"
VIDEO_SOURCE = "input.mp4"
SHAPE = (720, 1280) # HxW
# ============================================================================

def train_bg_subtractor(inst, cap, num=500):
  '''
    BG substractor need process some amount of frames to start giving result
  '''
  print ('Training BG Subtractor...')
  i = 0
  for frame in cap:
    (frame, None, 0.001)
    i += 1
    if i >= num:
      return cap

def main():
  log = ("main")

  # creting MOG bg subtractor with 500 frames in cache
  # and shadow detction
  bg_subtractor = cv2.createBackgroundSubtractorMOG2(
    history=500, detectShadows=True)

  # Set up image source
  # You can use also CV2, for some reason it not working for me
  cap = (VIDEO_SOURCE)

  # skipping 500 frames to train bg subtractor
  train_bg_subtractor(bg_subtractor, cap, num=500)

  frame_number = -1
  for frame in cap:
    if not ():
      ("Frame capture failed, stopping...")
      break

    frame_number += 1
    utils.save_frame(frame, "./out/frame_%" % frame_number)
    fg_mask = bg_subtractor.apply(frame, None, 0.001)
    utils.save_frame(frame, "./out/fg_mask_%" % frame_number)
# ============================================================================

if __name__ == "__main__":
  log = utils.init_logging()

  if not (IMAGE_DIR):
    ("Creating image directory `%s`...", IMAGE_DIR)
    (IMAGE_DIR)

  main()

The following foreground image is obtained after processing

Foreground image after removing background

We can see that there is some noise on the foreground image, which can be removed by standard filtering techniques.

filtering radio waves (i.e. pick out one frequency)

For our current situation, we will need the following filter functions: Threshold, Erode, Dilate, Opening, Closing.

First, we use "Closing" to remove gaps in the region, then "Opening" to remove individual independent pixel points, and then "Dilate to remove individual pixels, and then "Dilate" to expand the object to make it thicker. The code is as follows:

def filter_mask(img):
  kernel = (cv2.MORPH_ELLIPSE, (2, 2))
  # Fill any small holes
  closing = (img, cv2.MORPH_CLOSE, kernel)
  # Remove noise
  opening = (closing, cv2.MORPH_OPEN, kernel)
  # Dilate to merge adjacent blobs
  dilation = (opening, kernel, iterations=2)
  # threshold
  th = dilation[dilation < 240] = 0
  return th

The treated outlook is as follows:

Object detection using contours

We will use the function to detect the contour. The parameters we can choose while using it are:

cv2.CV_RETR_EXTERNAL------ gets only the external contour.

cv2.CV_CHAIN_APPROX_TC89_L1------ using Teh-Chin chain approximation algorithm (faster)

The code is as follows:

def get_centroid(x, y, w, h):
   x1 = int(w / 2)
   y1 = int(h / 2)
   cx = x + x1
   cy = y + y1
   return (cx, cy)
 
 def detect_vehicles(fg_mask, min_contour_width=35, min_contour_height=35):
   matches = []
   # finding external contours
   im, contours, hierarchy = (
     fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)
   # filtering by with, height
   for (i, contour) in enumerate(contours):
     (x, y, w, h) = (contour)
     contour_valid = (w >= min_contour_width) and (
       h >= min_contour_height)
     if not contour_valid:
       continue
     # getting center of the bounding box
     centroid = get_centroid(x, y, w, h)
     (((x, y, w, h), centroid))
   return matches

Establishment of a data-processing framework

We all know that there is no single algorithm that can handle all problems in ML and CV. Even if such an algorithm existed, we wouldn't use it because it's hardly effective at scale. For example, a few years ago Netflix offered a $3 million prize for the best movie recommendation algorithm. There was a team that accomplished this, but their recommendation algorithm couldn't work at scale, so it was actually useless to the company. However, Netflix still rewarded them with $1 million dollars.

Next we build the framework for solving the problem at hand, which makes it easier to work with the data

class PipelineRunner(object):
   '''
     Very simple pipline.
     Just run passed processors in order with passing context from one to 
     another.
     You can also set log level for processors.
   '''
   def __init__(self, pipeline=None, log_level=):
      = pipeline or []
      = {}
      = (self.__class__.__name__)
     (log_level)
     self.log_level = log_level
     self.set_log_level()
   def set_context(self, data):
      = data
   def add(self, processor):
     if not isinstance(processor, PipelineProcessor):
       raise Exception(
         'Processor should be an isinstance of PipelineProcessor.')
     (self.log_level)
     (processor)
 
   def remove(self, name):
     for i, p in enumerate():
       if p.__class__.__name__ == name:
         del [i]
         return True
     return False
 
   def set_log_level(self):
     for p in :
       (self.log_level)
 
   def run(self):
     for p in :
        = p() 
     ("Frame #%d processed.", ['frame_number'])
     return 
 
 class PipelineProcessor(object):
   '''
     Base class for processors.
   '''
   def __init__(self):
      = (self.__class__.__name__)

First we get a list of processors in running order and let each processor do a part of the work and complete the execution in case order to get the final result.

We start by creating the contour detection processor. The contour detection processor simply combines the previous background deduction, filtering and contour detection sections together, as shown in the code below:

class ContourDetection(PipelineProcessor):
   '''
     Detecting moving objects.
     Purpose of this processor is to subtrac background, get moving objects
     and detect them with a  method, and then filter off-by
     width and height. 
     bg_subtractor - background subtractor isinstance.
     min_contour_width - min bounding rectangle width.
     min_contour_height - min bounding rectangle height.
     save_image - if True will save detected objects mask to file.
     image_dir - where to save images(must exist).    
   '''
 
   def __init__(self, bg_subtractor, min_contour_width=35, min_contour_height=35, save_image=False, image_dir='images'):
     super(ContourDetection, self).__init__()
     self.bg_subtractor = bg_subtractor
     self.min_contour_width = min_contour_width
     self.min_contour_height = min_contour_height
     self.save_image = save_image
     self.image_dir = image_dir
 
   def filter_mask(self, img, a=None):
     '''
       This filters are hand-picked just based on visual tests
     '''
     kernel = (cv2.MORPH_ELLIPSE, (2, 2))
     # Fill any small holes
     closing = (img, cv2.MORPH_CLOSE, kernel)
     # Remove noise
     opening = (closing, cv2.MORPH_OPEN, kernel)
     # Dilate to merge adjacent blobs
     dilation = (opening, kernel, iterations=2)
     return dilation
 
   def detect_vehicles(self, fg_mask, context):
     matches = []
     # finding external contours
     im2, contours, hierarchy = (
       fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)
     for (i, contour) in enumerate(contours):
       (x, y, w, h) = (contour)
       contour_valid = (w >= self.min_contour_width) and (
         h >= self.min_contour_height)
       if not contour_valid:
         continue
       centroid = utils.get_centroid(x, y, w, h)
       (((x, y, w, h), centroid))
     return matches
 
   def __call__(self, context):
     frame = context['frame'].copy()
     frame_number = context['frame_number']
     fg_mask = self.bg_subtractor.apply(frame, None, 0.001)
     # just thresholding values
     fg_mask[fg_mask < 240] = 0
     fg_mask = self.filter_mask(fg_mask, frame_number)
     if self.save_image:
       utils.save_frame(fg_mask, self.image_dir +
                "/mask_%" % frame_number, flip=False)
     context['objects'] = self.detect_vehicles(fg_mask, context)
     context['fg_mask'] = fg_mask
     return contex

Now, let's create a processor that will find out the same objects detected on different frames, create paths, and count the vehicles that reach the exit area. The code is shown below:

  '''
    Counting vehicles that entered in exit zone.

    Purpose of this class based on detected object and local cache create
    objects pathes and count that entered in exit zone defined by exit masks.

    exit_masks - list of the exit masks.
    path_size - max number of points in a path.
    max_dst - max distance between two points.
  '''

  def __init__(self, exit_masks=[], path_size=10, max_dst=30, x_weight=1.0, y_weight=1.0):
    super(VehicleCounter, self).__init__()

    self.exit_masks = exit_masks

    self.vehicle_count = 0
    self.path_size = path_size
     = []
    self.max_dst = max_dst
    self.x_weight = x_weight
    self.y_weight = y_weight

  def check_exit(self, point):
    for exit_mask in self.exit_masks:
      try:
        if exit_mask[point[1]][point[0]] == 255:
          return True
      except:
        return True
    return False

  def __call__(self, context):
    objects = context['objects']
    context['exit_masks'] = self.exit_masks
    context['pathes'] = 
    context['vehicle_count'] = self.vehicle_count
    if not objects:
      return context

    points = (objects)[:, 0:2]
    points = ()

    # add new points if pathes is empty
    if not :
      for match in points:
        ([match])

    else:
      # link new points with old pathes based on minimum distance between
      # points
      new_pathes = []

      for path in :
        _min = 999999
        _match = None
        for p in points:
          if len(path) == 1:
            # distance from last point to current
            d = (p[0], path[-1][0])
          else:
            # based on 2 prev points predict next point and calculate
            # distance from predicted next point to current
            xn = 2 * path[-1][0][0] - path[-2][0][0]
            yn = 2 * path[-1][0][1] - path[-2][0][1]
            d = (
              p[0], (xn, yn),
              x_weight=self.x_weight,
              y_weight=self.y_weight
            )

          if d < _min:
            _min = d
            _match = p

        if _match and _min <= self.max_dst:
          (_match)
          (_match)
          new_pathes.append(path)

        # do not drop path if current frame has no matches
        if _match is None:
          new_pathes.append(path)

       = new_pathes

      # add new pathes
      if len(points):
        for p in points:
          # do not add points that already should be counted
          if self.check_exit(p[1]):
            continue
          ([p])

    # save only last N points in path
    for i, _ in enumerate():
      [i] = [i][self.path_size * -1:]

    # count vehicles and drop counted pathes:
    new_pathes = []
    for i, path in enumerate():
      d = path[-2:]

      if (
        # need at list two points to count
        len(d) >= 2 and
        # prev point not in exit zone
        not self.check_exit(d[0][1]) and
        # current point in exit zone
        self.check_exit(d[1][1]) and
        # path len is bigger then min
        self.path_size <= len(path)
      ):
        self.vehicle_count += 1
      else:
        # prevent linking with path that already in exit zone
        add = True
        for p in path:
          if self.check_exit(p[1]):
            add = False
            break
        if add:
          new_pathes.append(path)

     = new_pathes

    context['pathes'] = 
    context['objects'] = objects
    context['vehicle_count'] = self.vehicle_count

    ('#VEHICLES FOUND: %s' % self.vehicle_count)

    return context

The code above is a bit complex, so let's go over it part by part.

The green part in the image above is the exit area. We count the vehicles here, and we only count them if they move more than 3 points in length

We use a mask to solve this problem because it is much more efficient and simpler than using a vector algorithm. Simply use a "binary sum" to select the midpoint of the vehicle area. The setup is as follows:

EXIT_PTS = ([
   [[732, 720], [732, 590], [1280, 500], [1280, 720]],
   [[0, 400], [645, 400], [645, 0], [0, 0]]
 ])
 
 base = (SHAPE + (3,), dtype='uint8')
 exit_mask = (base, EXIT_PTS, (255, 255, 255))[:, :, 0]

Now we link the detected points.

For the first frame of the image, we add all points as new paths.

Next, if len(path) == 1, we find the object closest to the last point of each path among the newly detected objects.

If len(path) > 1, the last two points in the path are used, i.e. the new point is predicted on the same line and the minimum distance between that point and the current point is found.

Points with minimum distance will be added to the end of the current path and removed from the list. If there are points left after this, we will add them as new paths. This process also limits the number of points in the path.

new_pathes = []
 for path in :
   _min = 999999
   _match = None
   for p in points:
     if len(path) == 1:
       # distance from last point to current
       d = (p[0], path[-1][0])
     else:
       # based on 2 prev points predict next point and calculate
       # distance from predicted next point to current
       xn = 2 * path[-1][0][0] - path[-2][0][0]
       yn = 2 * path[-1][0][1] - path[-2][0][1]
       d = (
         p[0], (xn, yn),
         x_weight=self.x_weight,
         y_weight=self.y_weight
       )
 
     if d < _min:
       _min = d
       _match = p
 
   if _match and _min <= self.max_dst:
     (_match)
     (_match)
     new_pathes.append(path)
 
   # do not drop path if current frame has no matches
   if _match is None:
     new_pathes.append(path)
 
  = new_pathes
 
 # add new pathes
 if len(points):
   for p in points:
     # do not add points that already should be counted
     if self.check_exit(p[1]):
       continue
     ([p])
 
 # save only last N points in path
 for i, _ in enumerate():
   [i] = [i][self.path_size * -1:]

Now, we will try to count the vehicles entering the exit area. To do this, we need to get the last 2 points in the path and check if len(path) should be greater than the limit.

# count vehicles and drop counted pathes:
  new_pathes = []
  for i, path in enumerate():
    d = path[-2:]
    if (
      # need at list two points to count
      len(d) >= 2 and
      # prev point not in exit zone
      not self.check_exit(d[0][1]) and
      # current point in exit zone
      self.check_exit(d[1][1]) and
      # path len is bigger then min
      self.path_size <= len(path)
    ):
      self.vehicle_count += 1
    else:
      # prevent linking with path that already in exit zone
      add = True
      for p in path:
        if self.check_exit(p[1]):
          add = False
          break
      if add:
        new_pathes.append(path)
   = new_pathes
  
  context['pathes'] = 
  context['objects'] = objects
  context['vehicle_count'] = self.vehicle_count 
  ('#VEHICLES FOUND: %s' % self.vehicle_count)
  return context

The last two processors are the CSV writer for creating report CSV files, and for debugging and beautiful image visualization.

class CsvWriter(PipelineProcessor):
    def __init__(self, path, name, start_time=0, fps=15):
      super(CsvWriter, self).__init__()
       = open((path, name), 'w')
       = (, fieldnames=['time', 'vehicles'])
      ()
      self.start_time = start_time
       = fps
       = path
       = name
       = None
    def __call__(self, context):
      frame_number = context['frame_number']
      count = _count = context['vehicle_count']
      if :
        _count = count - 
      time = ((self.start_time + int(frame_number / )) * 100
          + int(100.0 / ) * (frame_number % ))
      ({'time': time, 'vehicles': _count})
       = count
      return context
  class Visualizer(PipelineProcessor):
    def __init__(self, save_image=True, image_dir='images'):
      super(Visualizer, self).__init__()
      self.save_image = save_image
      self.image_dir = image_dir
    def check_exit(self, point, exit_masks=[]):
      for exit_mask in exit_masks:
        if exit_mask[point[1]][point[0]] == 255:
          return True
      return False
    def draw_pathes(self, img, pathes):
      if not ():
        return
      for i, path in enumerate(pathes):
        path = (path)[:, 1].tolist()
        for point in path:
          (img, point, 2, CAR_COLOURS[0], -1)
          (img, [np.int32(path)], False, CAR_COLOURS[0], 1)
      return img
    def draw_boxes(self, img, pathes, exit_masks=[]):
      for (i, match) in enumerate(pathes):
        contour, centroid = match[-1][:2]
        if self.check_exit(centroid, exit_masks):
          continue
        x, y, w, h = contour
        (img, (x, y), (x + w - 1, y + h - 1),
               BOUNDING_BOX_COLOUR, 1)
        (img, centroid, 2, CENTROID_COLOUR, -1)
      return img
    def draw_ui(self, img, vehicle_count, exit_masks=[]):
      # this just add green mask with opacity to the image
      for exit_mask in exit_masks:
        _img = (, )
        _img[:, :] = EXIT_COLOR
        mask = cv2.bitwise_and(_img, _img, mask=exit_mask)
        (mask, 1, img, 1, 0, img)
      # drawing top block with counts
      (img, (0, 0), ([1], 50), (0, 0, 0), )
      (img, ("Vehicles passed: {total} ".format(total=vehicle_count)), (30, 30),
            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1)
      return img
    def __call__(self, context):
      frame = context['frame'].copy()
      frame_number = context['frame_number']
      pathes = context['pathes']
      exit_masks = context['exit_masks']
      vehicle_count = context['vehicle_count']
      frame = self.draw_ui(frame, vehicle_count, exit_masks)
      frame = self.draw_pathes(frame, pathes)
      frame = self.draw_boxes(frame, pathes, exit_masks)
      utils.save_frame(frame, self.image_dir +
               "/processed_%" % frame_number)
      return context

reach a verdict

As we can see, it's not as hard as many people think. However, if the partner runs the script, the partner will find that this solution is not ideal, there are problems with overlapping foreground objects, and it does not categorize vehicles by type. However, the algorithm has good accuracy when the camera has a good location, for example, directly above the road.

To this article on the use of OpenCV to achieve the use of road vehicle counting methodology is introduced to this article, more related to OpenCV road vehicle counting content, please search for my previous articles or continue to browse the following related articles I hope that you will support me more in the future!