# Ambilighht v2 : Threads Inside (en)

### dimanche, 4 janvier 2015

In a previous article (english), I described a serie of preliminary tests towards building a ambilight-like system (in the finest DIY way).

While testing on a less powerful machine, we noted (some hackers from HAUM and I) some kind of slowness in the process.

The last article was considering the following about the last piece of code :

As you can see, the video is pretty smooth and reactive.

I was actually correct.... but not on every machine...

The related piece of code was :

import cv2
import numpy as np

# instanciate CAM object
cam = cv2.VideoCapture(0)

# get one image to process

# get total width/height
l = f.shape[1]
h = f.shape[0]

# select number of regions
nb = 10

# compute height and width of each region
dh = int(f.shape[0]/nb)
bandwidth = int(l*0.05) # 5% of total width

# act continuously
while 1:

# get one image to process

# for each region
for k in xrange(nb):

for i in xrange(dh):
for j in xrange(bandwidth):

# compute averages

# draw rectangles
cv2.rectangle(f, (0,dh*k), (bandwidth,dh*(k+1)), color=val_left, thickness=-1)
cv2.rectangle(f, (l-bandwidth,dh*k), (l,dh*(k+1)), color=val_right, thickness=-1)

# show image instead one saving it
# 'w1 is the window reference
cv2.imshow('w1', f)

# wait 2ms for an Esc input and break if it comes.
if cv2.waitKey(2) == 27:
break


The only change between this code and the one from last article (which had only been tested on ArchLinux) is that I added the 2 last lines. Indeed, without them, ubuntu (on my netbook) just refuse to display anything.

So, we'll proceed as follow :

• analyze existing code to find its weak points
• rewrite the code step by step (explaining by the way the main principle of multi-threaded programming in Python)

# Analysis

This code faces several problems, the first and most obvious (which is forgivable as this script is just a proof of concept) is that there is abbsolutely no error handling (what happened if no camera is found ?). I won't spend a lot of time of this aspect, but in a real finished project, you must fix this point.

If you read carefully, you'll notice that the processing is quite dirty. The algorithm is doing ::

Always Do
Grab an image

For each n in the number of "regions"
Create a white mask for right zone
Create a white mask for left zone

For each i in dh
For each j in bandwith
Replace the point (i,j) with a 1 in right mask
Replace the point (i,j) with a 1 in left mask
End For
End For

Compute average of right zone
Compute average of left zone

Trace right rectangle
Trace left rectangle
End For
End Do


Trained reader will obviously notice that the mask calculations for each zone can be made only once and stored.

It could be done also for zones themselves : we could compute data for the 2 delimiting points and use it for each iteration.

Finally, you'll notice that tracing the rectangles is the only action that modify the image : we could parallelize computation of average colors.

The global idea it to create something like that : the main process grabs images and pre-computes masks and zonees coordinates. Then, it spawns a serie of workers : several to average colors and only one to draw rectangles.

# Preliminaries

## CycleQueue

When using threads, we often use Queue.Queue() to create a waiting queue between data produced by a thread and consummed by another. I've extended this class a bit, alllowing the programmer to store one queue state and to recall this state. The conception of the augmented class is described in another article (fr).

Here is an use case :

from utilities import CycleQueue

# instanciation
q = CycleQueue()

# utilisation comme une Queue classique
for i in xrange(10):
q.put(i)

# vérrouillage de l'état courant
q.lockstate()

# utilisation comme une Queue classique
for i in xrange(10):
q.get()
# faire des trucs

# restoration de l'état verrouillé
q.reinit()

# etc...


## Singleton

Within OOP, a singleton a well-known design pattern. The concept is to build a class that will always reference the same instance of itself.

I'm not always willing to recreate all from scratch, StackOverflow proposes a excellent way of implementing this pattern in Python.

Here is the decorator i used (inside utilities.py):

class Singleton:
"""
A non-thread-safe helper class to ease implementing singletons.
This should be used as a decorator -- not a metaclass -- to the
class that should be a singleton.

The decorated class can define one __init__ function that
takes only the self argument. Other than that, there are
no restrictions that apply to the decorated class.

To get the singleton instance, use the Instance method. Trying
to use __call__ will result in a TypeError being raised.

Limitations: The decorated class cannot be inherited from.

"""

def __init__(self, decorated):
self._decorated = decorated

def Instance(self):
"""
Returns the singleton instance.  Upon its first call, it creates a
new instance of the decorated class and calls its __init__ method.
On all subsequent calls, the already created instance is returned.

"""

try:
return self._instance
except AttributeError:
self._instance = self._decorated()
return self._instance

def __call__(self):
raise TypeError('Singletons must be accessed through Instance().')

def __instancecheck__(self, inst):
return isinstance(inst, self._decorated)


We will only need to decorate singleton classes with @Singleton. Here, it's the class that will handle the image itself which will reference a singleton :

@Singleton
class IMG:
""" Handle current image """

def __init__(self):
self.final_image = None
self.image = None

def new_image(self, f):
self.image = f
self.final_image = f


# Globals

Programmers often fight against global variables, but sometimes, this does allow clean and short function calls. Here are my globals :

# instanciate CAM object
cam = cv2.VideoCapture(0)

l = image_width = f.shape[1]
image_height = f.shape[0]
nb_points = 5
dh = int(f.shape[0]/nb_points)
bandwidth = int(l*0.05) # 5% of total width



## Computation for masks and zones

Let's fill masks with valid data. We'll write a function which will be called in main() to do that. Even if masks is global, I give it to the function as an argument (along with the CycleQueue instance for zones) to make this function more portable.

def enqueue_zones(queue, masks):
""" Generate zones coordinates and masks and enqueue them

tuple format :
(y0, x0, y1, x1, index of masks)

we have to store masks in another lists as Queue() doesn't
recognize numpy arrays as valid datatypes

"""

for h in xrange(nb_points):
for i in xrange(dh):
for j in xrange(bandwidth):

# enqueue
## left zone
queue.put(( 0, dh*h, bandwidth, dh*(h+1), prev_len))
## right zone
queue.put(( image_width-bandwidth, dh*h, image_width, dh*(h+1), prev_len+1))


I think the code is clear enough (if you don't think so, just tell me and I'll explain it).

We now have all we need to feed our workers :)

# Workers

Now, we'll write our two workers :

• color averaging on one hand
• rectangles drawing on another hand

## Why threading rectangles drawing is interesting ?

Threading the drawing worker alllow use to start drawing before the averaging ends.

To achieve that, the two workers pools will discuss through a Queue, averaging workers adding zones and colors to the queue the drawing worker will read later.

We'll have to carefully wait for the emptying of both queues before switching frame.

In Python, writing a thread is quite easy (in comparison to other langages).

We do create a class (inheriting from threading.Thread) and write at least two methods : __init__ and run.

The former contains (as usual) initialization elements and inevitably :

threading.Thread.__init__(self)


To ensure complete setup from parent class.

The latter contains the executed code inside the thread.

A hello World thread is coded as :

import threading

def __init__(self):

def run(self):
while True:
print("Hello, world!")

t.join() # attente de la fin du thread

# fin du programme


Here, we'll use some Queue to make our threads communicate. You must know that other mecanisms exist : mutex, semaphores, locks, etc....

For further information, look at the doc.

## Averager

Here is the code for our dear averager thread :

class ColorAverageWorker(threading.Thread):

def __init__(self, queue, out_queue):
self.queue = queue
self.out_queue = out_queue

def run(self):
while True:
zone = self.queue.get()
# add a dict to out queue :
# zone => zone tuple given by previous queue
# color => color tuple given by cv2.mean()
self.out_queue.put({'zone': zone,
'color': color})


Nothing complicated, uh ? As the greatest part of the job is precomputed, our threads are only quasi-empty workers :)

Notice, however, the usage of :

IMG.Instance().image


This do return the ìmage attribute for the unique instance of IMG class (which is a singleton).

## Drawer

The second thread is almost as simple as the first one :

class WorkerDraw(threading.Thread):

def __init__(self, queue):
self.queue = queue

def run(self):

while True:
point = self.queue.get()
zone = point['zone']
cv2.rectangle(
IMG.Instance().final_image,
(zone[0], zone[1]),
(zone[2], zone[3]),
color=point['color'],
thickness=-1
)


Here again, notice that the zone variable is created only to make the code simpler. Also notice tuples in thhe call foor cv2.rectangle.

Finally, the last thing to notice is that we do modify ÌMG.Instance().final_image and not IMG.Instance().image. It suppress the need for locks and simplify a bit the code.

# Main

Lastly, we can write our master process : the main().

You'll find a strange similarity with the code in the previous article :

def main():

# init a CycleQueue for zones
zones = CycleQueue()
# ... and a Queue for the drawer
out_queue = Queue()

# enqueue zones
zones.lockstate()

# set number of workers (averaging only)
num_workers = 5

# spawn workers
for i in xrange(num_workers):
t = ColorAverageWorker(zones, out_queue)
t.start()

# all workers :)
t = WorkerDraw(out_queue)
t.start()

# loop over frames
while True:

IMG.Instance().new_image(f)

# ensure you have the right queue
zones.reinit()

# wait both queues to be empty
zones.join()
out_queue.join()

# show image and wait for a keystroke
cv2.imshow('w1', IMG.Instance().final_image)

if cv2.waitKey(2) == 27:
break

if __name__=='__main__':
main()


And voilà !