, , , , , , , , , , , ,

Arkwood, my lewd Belgian buddy, is an avid player of retro computer games. Right now, he is knee-deep in the classic game Frogger. Here’s a screenshot of Arkwood as the frog, trying to cross the busy motorway and hop across the logs to get to the safety of the riverbank:


‘I always have problems with the traffic,’ he moaned, ‘I can never hear it coming.’ I told him not to worry, that I would write some Python code on my Raspberry Pi computer which would detect the flow of vehicles and play a traffic sound to alert him.

But first, I need a way of taking a series of screenshots of Frogger, whilst Arkwood plays it on his Windows 7 laptop, saving them to my Raspberry Pi for real-time analysis. Here’s the code:

from time import sleep
from PIL import ImageGrab

image_counter = 0

while True:
    screenshot = ImageGrab.grab(bbox=(0,50,1366,650))
    screenshot.save("\\\\RASPBERRYPI\MyPython\opticalflow\screenshots\{}.jpg".format(image_counter), "JPEG")

    image_counter += 1

Great. Now that my Pi is being fed screenshots of Frogger as Arkwood tackles the game, it can inspect the snaps and determine the flow of traffic.

The OpenCV Optical Flow tutorial has detail of how to detect movement of objects in a series of images. Here’s the code from the referenced OpenCV samples/python2/lk_track.py file, with a few adjustments to service Arkwood’s needs:

import cv2
import numpy as np
from common import draw_str
from time import sleep
from datetime import datetime
import pygame

lk_params = dict( winSize  = (15, 15),
                  maxLevel = 2,
                  criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

feature_params = dict( maxCorners = 500,
                       qualityLevel = 0.6,
                       minDistance = 7,
                       blockSize = 7 )

class OpticalFlow:
    def __init__(self):
        self.track_len = 10
        self.detect_interval = 5
        self.tracks = []
        self.frame_idx = 0
    # detect optical flow in series of images    
    def run(self):
        while True:
            frame = self._load_image()
            frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            vis = frame.copy()

            if len(self.tracks) > 0:
                img0, img1 = self.prev_gray, frame_gray
                p0 = np.float32([tr[-1] for tr in self.tracks]).reshape(-1, 1, 2)
                p1, st, err = cv2.calcOpticalFlowPyrLK(img0, img1, p0, None, **lk_params)
                p0r, st, err = cv2.calcOpticalFlowPyrLK(img1, img0, p1, None, **lk_params)
                d = abs(p0-p0r).reshape(-1, 2).max(-1)
                good = d < 1
                new_tracks = []
                for tr, (x, y), good_flag in zip(self.tracks, p1.reshape(-1, 2), good):
                    if not good_flag:
                    tr.append((x, y))
                    if len(tr) > self.track_len:
                        del tr[0]
                    cv2.circle(vis, (x, y), 2, (0, 255, 0), -1)
                self.tracks = new_tracks
                cv2.polylines(vis, [np.int32(tr) for tr in self.tracks], False, (0, 255, 0))
                draw_str(vis, (20, 20), 'track count: %d' % len(self.tracks))


            if self.frame_idx % self.detect_interval == 0:
                mask = np.zeros_like(frame_gray)
                mask[:] = 255
                for x, y in [np.int32(tr[-1]) for tr in self.tracks]:
                    cv2.circle(mask, (x, y), 5, 0, -1)
                p = cv2.goodFeaturesToTrack(frame_gray, mask = mask, **feature_params)
                if p is not None:
                    for x, y in np.float32(p).reshape(-1, 2):
                        self.tracks.append([(x, y)])

            self.frame_idx += 1
            self.prev_gray = frame_gray

# start optical flow

So what amendments have I made to the OpenCV code? Firstly, I’ve bumped the ‘qualityLevel’ from 0.3 to 0.6, to improve the accuracy of my vehicle detection. Next, we dispense with the video capture code, opting instead to load each screenshot image from a folder (the exact folder where the Windows 7 laptop is saving Frogger screenshots). There is a new method call, to play a traffic sound if vehicle flow is detected. Finally, instead of showing each screenshot marked with optical flow in a window, we save it to disk to eyeball at our leisure.

Here’s the method to load images:

    # load image from disk
    def _load_image(self):
        while True:
            frame = cv2.imread('screenshots/{}.jpg'.format(self.frame_idx), 1)

            if frame != None:
                return frame


An incremental number is used for each screenshot file in the series, so we simply loop until the next screenshot has been dumped into the folder by the Windows 7 laptop.

Here’s the method to play traffic sounds:

traffic_sound = pygame.mixer.Sound("131259__jcgd2__traffic-noise-in-the-street.wav")

    # play traffic sound
    def _play_traffic_sound(self):

        lorry_movement = 0
        for tr in self.tracks:
            lorry_movement += (tr[len(tr)-2][0])-(tr[len(tr)-1][0])

        if lorry_movement > 200:
        elif lorry_movement > 100:

        print lorry_movement

Before we instantiate our OpticalFlow class, we set up a sound file using the Pygame package. I obtained the traffic .wav file from Freesound: https://www.freesound.org/people/jcgd2/sounds/131259/

The _play_traffic_sound method stops any current traffic sound. It then determines how much optical flow has occurred between our previous and current screenshots by inspecting the tracks data. As we will see, it is the lorries on the motorway that are successfully tracked during a game of Frogger, so we concentrate on their movement right to left across the screen (calculating the optical flow on the x-axis).

Once we have our lorry movement, we can play the traffic sound file using Pygame. If the lorry movement is over 200, we play the sound at an ear-splitting volume of 0.9. If the lorry movement is over 100, then our optical flow tracking is not so strong, and we play the sound at volume 0.5. Movement under 100 means no sound is played.

Why play louder music when the optical flow is stronger? Well, the basic premise being that there are more lorries being tracked, so the motorway is busier for Arkwood. When the lorries exit the screen the tracking is lost and the noise dips.

Okay, here’s the method to save images:

    # save image to disk
    def _save_image(self, image):
        image_name = datetime.now().strftime('%Y%m%d_%Hh%Mm%Ss%f')
        cv2.imwrite('flowoutput/{}.jpg'.format(image_name), image)

Nothing complicated – we just save each optical flow screenshot to disk.

Time for a demo. I ask Arkwood to start playing Frogger on his laptop, whilst screenshots of his gameplay are taken every one second and saved to a folder on the Raspberry Pi.

On the Pi, my OpticalFlow class is waiting to receive the first screenshot. If it detects a suitable level of optical flow it will play traffic sounds through an attached set of speakers. Here are 15 screenshots of Arkwood playing Frogger, with optical flow marked as green lines:
















Hurray! Our lorries are being tracked, and the traffic sounds play through the speakers when optical flow is strong. Here’s the output of the program, showing the calculated lorry movement:


Arkwood was delighted with the traffic sounds, which helped him to safely navigate his webbed-toed amphibian across the busy motorway. Now the crocodiles in the river can get their lunch.


Couple of things to note…

As you will see from the screenshots, some of the features being tracked are never likely to move (such as the score counter at the top of the screen). Since we are calculating movement of features, these additional tracks will be ignored.

If we decrease the ‘qualityLevel’ setting, other object movement will be detected such as the cars, turtles and logs. However, the tracking of these other objects proved inconsistent.

The OpenCV code we are using to track optical flow reassesses its ‘good features to track’ every 5 frames, which may cause a blip in our lorry movement calculation.

Syncing is required between the speed at which the laptop is delivering each screenshot to the Raspberry Pi, and the speed at which the Raspberry Pi processes each screenshot.

When making use of the samples/python2/lk_track.py file, the dependant samples/python2/common.py file is required. To resolve an error with common.py, I had to rename ‘lineType’ to ‘linetype’ twice in the draw_str function.