, , , , , , , , , , , , ,

In previous post, Co-pilot for Donkey Kong, I built a ‘co-pilot’ robot for my putrid pal Arkwood, to help him gain a high score in retro computer game Donkey Kong. What the devil is a ‘co-pilot’ robot? I hear you ask. Allow me to explain.

The robot itself is some Python code running on my tiny Raspberry Pi computer.

I have used the PyBrain Reinforcement Learning package in order to provide my robot with a brain, so that it can learn how to help Arkwood play Donkey Kong.

I have used a webcam attached to the Pi to provide my robot with a set of eyes, so that it can detect objects in the computer game. The object in question is an umbrella, for which I have trained an OpenCV haar cascade classifier.

The robot has a mouth, by way of a set of speakers attached to the Pi. I have used Google’s Text To Speech service to allow the robot to speak to Arkwood.

The robot also uses a microphone attached to the Pi for ears. Google’s Speech to Text service affords Arkwood to talk to the robot.

Okay, that’s the robot. Let me give you a step-by-step example of how it learns:

  1. Arkwood starts playing a game of Donkey Kong on his Windows 7 laptop.
  2. The robot detects an umbrella on level 3 of the game, via the webcam that is pointing at the laptop screen.
  3. The robot asks Arkwood what level he is on, and Arkwood replies ‘3’.
  4. The robot then tells Arkwood to make a beeline for the umbrella, and Arkwood dutifully obliges.
  5. The robot then asks Arkwood how many lives he lost on the level, and Arkwood replies ‘2’.
  6. The robot understands that losing two lives is a bad thing, so it learns not to suggest beelining for umbrella again (at least, not on level 3).

And so it goes on. The more training the robot gets, the better the advice it can offer.

But, before the robot can become a wise oracle, it needs some way of storing what is has learned between games of Donkey Kong. In order to do this, I have inherited from the ActionValueTable class provided by PyBrain, so as to load and save its parameter values:

from pybrain.rl.learners.valuebased import ActionValueTable
import numpy

class GameTable(ActionValueTable):
    PARAMS_FILENAME = "params.npy"

    # load table parameters from file
    def loadParameters(self):
            self._params = numpy.load(self.PARAMS_FILENAME)
            print "Loaded parameters: {}".format(self._params)
            return True
            print "Error loading params file"
            return False
    # save table parameters to file
    def saveParameters(self):
            print "Saved parameters: {}".format(self._params)
            return True
            print "Error saving params file"
            return False

Great. Next time we boot up the Raspberry Pi, the robot will not need to start learning from scratch.

Time for a demo. Here’s Arkwood playing levels three and four of Donkey Kong, where the umbrella has been detected:



And here’s the output from the robot, showing how it’s conversing with my chum as per the step-by-step example above:


We can see the parameters being stored by the robot:


The first three parameters (all zero) relate to level one of the game, and correspond to the three actions that the robot can pass to Arkwood, namely ‘beeline to umbrella’, ‘collect umbrella at your leisure’ and ‘avoid umbrella’.

Level three, where the robot detects an umbrella, is represented by parameters seven, eight and nine. We can see that parameters eight and nine have non-zero values – which tells us that the robot has been learning.

Likewise, level four has an umbrella, and is represented by parameters ten, eleven and twelve.

Hurray! Our robot is getting smart. All in all, Arkwood played three games of Donkey Kong, and the results are thus:

Game Action Reward
1 collect umbrella at your leisure -1
2 avoid umbrella 1
3 avoid umbrella 1

For level three of the game the robot is rewarded for offering the advice ‘avoid umbrella’, and so repeats the advice.

Game Action Reward
1 avoid umbrella 1
2 avoid umbrella -1
3 beeline to umbrella 0

For level four of the game the robot is initially rewarded for offering the advice ‘avoid umbrella’, and so repeats the advice. However, this repeated advice yields a negative reward (as Arkwood lost two lives), so the robot changes tack for game three and suggests ‘beeline to umbrella’.

Obviously, in practice, the robot would need many iterations of gameplay before it learns how best to instruct my pal. The robot uses a Q-learning algorithm, which encourages it to experiment with its recommendations.

‘I think I have fallen in love with the robot’, Arkwood lamented in front of the mirror, combing his greasy hair and clutching a bunch of roses. ‘She has such a beautiful voice. Can you make her learn to love me back?’

I told him, No. Nothing on God’s Earth, whether of silicon or skin and bone, could ever be persuaded to cherish my scrawny Belgian friend.

It did not damped his spirits though. He coated his mouth with mint spray and made for the Raspberry Pi, announcing over his shoulder, ‘Don’t wait up for me!’


I’ve made a few amendments to the code since my previous blog. First up, the main program, which now incorporates the inherited ActionValueTable:

from gametable import GameTable
from pybrain.rl.learners import Q
from pybrain.rl.explorers import EpsilonGreedyExplorer
from pybrain.rl.agents import LearningAgent
from gameinteraction import GameInteraction
from gameenvironment import GameEnvironment
from gametask import GameTask
from pybrain.rl.experiments import Experiment

# setup AV Table
av_table = GameTable(8, 3)
if(av_table.loadParameters() == False):

# setup a Q-Learning agent
learner = Q(0.5, 0.0)
agent = LearningAgent(av_table, learner)

# setup game interaction
game_interaction = GameInteraction()

# setup environment
environment = GameEnvironment(game_interaction)

# setup task
task = GameTask(environment, game_interaction)

# setup experiment
experiment = Experiment(task, agent)

# okay, let's play the game
while True:

Next, the task and environment classes, which now receive the game’s interaction class via a constructor parameter:

from pybrain.rl.environments.task import Task
class GameTask(Task):
    def __init__(self, environment, game_interaction):
        self.env = environment
        self.game_interaction = game_interaction
    def getObservation(self):
        return self.env.getSensors()
    def performAction(self, action):
    def getReward(self):
        return self.game_interaction.get_result()
from pybrain.rl.environments.environment import Environment

class GameEnvironment(Environment):
    def __init__(self, game_interaction):
        self.game_interaction = game_interaction

    def getSensors(self):
        return self.game_interaction.get_level()
    def performAction(self, action):

And indeed the game interaction class (which handles the step-by-step example outlined in the post):

from webcam import Webcam
from detection import Detection
from speech import Speech
from time import sleep

class GameInteraction(object):

    def __init__(self):
        self.webcam = Webcam()
        self.detection = Detection()    
        self.speech = Speech()


    # 8 states
    LEVEL = {
        '1': [0.0],
        '2': [1.0],
        '3': [2.0],
        '4': [3.0],
        '5': [4.0],
        '6': [5.0],
        '7': [6.0],
        '8': [7.0]

    # 3 actions
    COMMAND = {
        0.0: "beeline to umbrella",
        1.0: "collect umbrella at your leisure",
        2.0: "avoid umbrella"

    # 3 rewards
    RESULT = {
        'none': 1.0,
        '1': 0.0,
        '2': -1.0

    def get_level(self):

        # await detection of umbrella
        while True:

            image = self.webcam.get_current_frame()
            item_detected = self.detection.is_item_detected_in_image('haarcascade_umbrella.xml', image)

            if item_detected:

        # ask Arkwood for game level
        self.speech.text_to_speech("What level are you on?")
        # wait for Arkwood's response
        response = ""
        while response not in self.LEVEL:
            response = self.speech.speech_to_text('/home/pi/PiAUISuite/VoiceCommand/speech-recog.sh').lower()

        return self.LEVEL[response]

    def give_command(self, action):
        # get command
        command_key = float(action)
        command = ""

        if command_key in self.COMMAND:
            command = self.COMMAND[command_key]

        # give command to Arkwood
    def get_result(self):
        # ask Arkwood for result
        self.speech.text_to_speech("How many lives did you lose on the level?")

        # wait for Arkwood's response
        response = ""
        while response not in self.RESULT:
            response = self.speech.speech_to_text('/home/pi/PiAUISuite/VoiceCommand/speech-recog.sh').lower()

        return self.RESULT[response]

Now, the webcam class has had a bit of an overhaul. The program was suffering from a time lag, which meant that photos of the gameplay served up to the robot were falling behind by as much as five frames. Fortunately a stackoverflow post provided the solution, utilising a thread in order to retrieve the bang latest frame:

import cv2
from threading import Thread

class Webcam:

    def __init__(self):
        self.video_capture = cv2.VideoCapture(0)
        self.current_frame = self.video_capture.read()[1]
    # create thread for capturing images
    def start(self):
        Thread(target=self._update_frame, args=()).start()

    def _update_frame(self):
            self.current_frame = self.video_capture.read()[1]
    # get the current frame
    def get_current_frame(self):
        return self.current_frame

And here’s the class that uses the OpenCV haar cascade classifier to detect the umbrella. I have upgraded my classifier from the previous post, cropping the positive images to remove the platform under the umbrella. The positive images shrunk from height 15 to height 14, and the stages of training shrunk from 10 to 9. The classifier works great, consistently detecting the umbrella on levels three and four of the game (and not detecting anything else besides):

import cv2
from datetime import datetime

class Detection(object):
    # is item detected in image
    def is_item_detected_in_image(self, item_cascade_path, image):

        # do detection
        item_cascade = cv2.CascadeClassifier(item_cascade_path)
        gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        items = item_cascade.detectMultiScale(gray_image, scaleFactor=1.1, minNeighbors=8)

        for (x,y,w,h) in items:
        # save image to disk
        # indicate whether item detected in image
        return len(items) > 0

    # save image to disk
    def _save_image(self, img):
        filename = datetime.now().strftime('%Y%m%d_%Hh%Mm%Ss%f') + '.jpg'
        cv2.imwrite("WebCam/Detection/" + filename, img)

Lastly, the speech class, which handles the calls to Google’s Speech To Text and Text To Speech services:

from subprocess import Popen, PIPE, call
import urllib
class Speech(object):
    # converts speech to text
    def speech_to_text(self, filepath):
            # utilise PiAUISuite to turn speech into <span class="hiddenGrammarError" pre="">text
            text</span> = Popen(['sudo', filepath], stdout=PIPE).communicate()[0]
            # tidy up text
            text = text.replace('"', '').strip()
            # debug

            return text
            print ("Error translating speech")
            return ""
    # converts text to speech
    def text_to_speech(self, text):
            # truncate text as google only allows 100 chars
            text = text[:100]
            # encode the text
            query = urllib.quote_plus(text)
            # build endpoint
            endpoint = "http://translate.google.com/translate_tts?tl=en&q=" + query
            # debug
            # get google to translate and mplayer to play
            call(["mplayer", endpoint], shell=False, stdout=PIPE, stderr=PIPE)
            print ("Error translating text")