Python Programming by Example (2015)

11. Concurrency

This chapter explains how to create concurrency in Python

11.1 Getting Started

We can run a program or a function in background using Python. In this chapter, we explore several scenarios to build concurrency application. In Python, you can read concurrency on this website, https://docs.python.org/3/library/concurrency.html .

11.2 Threading

Basically, we can implement threading using Thread with passing the function. You can call start() to run a thread.

import time

import threading

global running

def perform():

    global running

    counter = 0

    running = True

    while running:

        print('counter:', str(counter))

        time.sleep(2)

        counter += 1

my_thread = threading.Thread(target=perform)

my_thread.setDaemon(True)

my_thread.start()

# python 3

input("Press Enter to stop...")

# python 2

#raw_input("Press Enter to stop...")

running = False

my_thread.join(2)

To exit from a thread, we can call join() with timeout value.

Save into a file, called ch11_01.py. 

Run the program.

$ python3 ch11_01.py

When Thread.start() is called, it executes perform() function.

A sample of program output can be seen in Figure below.

p11-1

Thread object can be implemented by a derived class from threading.Thread. For instance, we create a class, MyThread. It has inheritance from threading.Thread and implement run() function.

Write these scripts.

import time

import threading

class MyThread(threading.Thread):

    def __init__(self):

        threading.Thread.__init__(self)

        self.running = False

    def run(self):

        counter = 0

        self.running = True

        while self.running:

            print('counter:', str(counter))

            time.sleep(2)

            counter += 1

    def stop(self):

        print('stopping thread...')

        self.running = False

        self.join(2)

my_thread = MyThread()

my_thread.setDaemon(True)

my_thread.start()

# python 3

input("Press Enter to stop...")

# python 2

#raw_input("Press Enter to stop...")

my_thread.stop()

Save these scripts into a file, called ch11_02.py. You can see our object, my_thread, call start() then, it call run(). We call stop() function to stop our thread. Basically, MyThread will call join() while called stop() function.

Now you can run the program.

$ python3 ch11_02.py

A sample of program output:

p11-2

11.3 Synchronizing Threads

We can synchronize among background codes in Python. In this section, we use mutex lock and event for thread synchronization.

Let's start.

11.3.1 Mutex Locks

The idea is a simple. When we access a resource, we call acquire(). If done, you call release() from Lock object. 

For testing, we define a shared resource, such as a variable called value. This variable will be access by two threads. Only one thread can access this variable.

Ok, write these scripts.

import time

import threading

class MyThread(threading.Thread):

    def __init__(self, name, o_lock):

        threading.Thread.__init__(self)

        self.name = name

        self.running = False

        self.value_lock = o_lock

    def run(self):

        global value

        self.running = True

        while self.running:

            self.value_lock.acquire()

            value += 1

            print('value:', str(value),' from ', self.name)

            self.value_lock.release()

            time.sleep(2)

    def stop(self):

        print('stopping ', self.name)

        self.running = False

        self.join(2)

global value

value = 0

value_lock = threading.Lock()

my_thread1 = MyThread('Thread 1',value_lock)

my_thread1.setDaemon(True)

my_thread2 = MyThread('Thread 2',value_lock)

my_thread2.setDaemon(True)

my_thread1.start()

my_thread2.start()

# python 3

input("Press Enter to stop...")

# python 2

#raw_input("Press Enter to stop...")

my_thread1.stop()

my_thread2.stop()

Save into a file, called ch11_03.py.

Now you can run the program.

$ python3 ch11_03.py

Program output:

p11-3

11.3.2 Event

Another option to synch threading, we can use Event object. Call wait() to block operation. It means the program can't execute codes after wait(). Then, call set() to release the blocking process.

For illustration, we create three worker threads. These threads will perform something after calling set() from event.  This is useful for initialization state process.

Write these scripts.

import time

import threading

class Worker(threading.Thread):

    def __init__(self, name, signal):

        threading.Thread.__init__(self)

        self.name = name

        self.signal = signal

    def run(self):

        print('waiting from ', self.name)

        self.signal.wait()

        print('processing from ', self.name)

        time.sleep(2)

        print('done from ', self.name)

signal_event = threading.Event()

my_thread1 = Worker('Thread 1', signal_event)

my_thread1.setDaemon(True)

my_thread2 = Worker('Thread 2', signal_event)

my_thread2.setDaemon(True)

my_thread3 = Worker('Thread 3', signal_event)

my_thread3.setDaemon(True)

my_thread1.start()

my_thread2.start()

my_thread3.start()

# waiting for 10 seconds

time.sleep(10)

# start process

print('Send a signal to start processing')

signal_event.set()

# python 3

input("Press Enter to stop...")

# python 2

#raw_input("Press Enter to stop...")

print('Done all')

Save into a file, called ch11_04.py. Then, run the program.

$ python3 ch11_04.py

Program output:

p11-4

11.4 Queue

In this section, we learn about Queue object from Python, https://docs.python.org/3/library/queue.html .

For testing, we add some jobs into Queue. Then, some worker threads will peak the job and run it.

Let's write these scripts.

import time

import threading

import queue

class Worker(threading.Thread):

    def __init__(self, name, q):

        threading.Thread.__init__(self)

        self.name = name

        self.q = q

    def run(self):

        while True:

            if self.q.empty():

                print('thread stopped')

                break

            job = self.q.get()

            print('run job', str(job), ' from', self.name)

            time.sleep(1)

            self.q.task_done()

q = queue.Queue()

# generate jobs

print('populate jobs')

for i in range(15):

    q.put(i)

my_thread1 = Worker('Thread 1', q)

my_thread1.setDaemon(True)

my_thread2 = Worker('Thread 2', q)

my_thread2.setDaemon(True)

my_thread3 = Worker('Thread 3', q)

my_thread3.setDaemon(True)

my_thread1.start()

my_thread2.start()

my_thread3.start()

my_thread1.join()

my_thread2.join()

my_thread3.join()

# python 3

input("Press Enter to stop...")

# python 2

#raw_input("Press Enter to stop...")

print('Done all')

Save into a file, called ch11_05.py. 

Now you can run the program.

$ python3 ch11_05.py

Program output:

p11-5

11.5 Multiprocessing

We can implement concurrency in Python using multiprocessing. You can read the information about this on this site, https://docs.python.org/3/library/multiprocessing.html .

11.5.1 Process

We can use Process object to implement multiprocessing in Python. For instance, we build a counter from a process.

Write these scripts.

import time

import multiprocessing

class MyProcess(multiprocessing.Process):

    def __init__(self):

        multiprocessing. Process.__init__(self)

        self.running = False

    def run(self):

        counter = 0

        self.running = True

        while self.running:

            print('counter:', str(counter))

            time.sleep(2)

            counter += 1

    def stop(self):

        print('stopping process...')

        self.running = False

        self.join(1)

my_process = MyProcess()

my_process.daemon = True

my_process.start()

# python 3

input("Press Enter to stop...")

# python 2

#raw_input("Press Enter to stop...")

my_process.stop()

Save into a file, called ch11_06.py, and run it.

$ python3 ch11_06.py

Program output:

p11-6

11.5.2 Synchronizing Processes

We can synch among processes using multiprocessing.Value. This object implement synchronizing process.

For testing, we define a shared resource via multiprocessing.Value. Write these scripts.

import time

import multiprocessing

class MyProcess(multiprocessing.Process):

    def __init__(self, name, shared_dt):

        multiprocessing. Process.__init__(self)

        self.name = name

        self.running = False

        self.shared_data = shared_dt

    def run(self):

        self.running = True

        while self.running:

            time.sleep(1)

            with self.shared_data.get_lock():

                self.shared_data.value += 1

                print('value:', str(self.shared_data.value), ' from ', self.name)

    def stop(self):

        print('stopping ', self.name)

        self.running = False

        self.join(1)

shared_data = multiprocessing.Value('i', 0, lock=True)

my_process1 = MyProcess('Process 1', shared_data)

my_process1.daemon = True

my_process2 = MyProcess('Process 2', shared_data)

my_process2.daemon = True

my_process1.start()

my_process2.start()

# python 3

input("Press Enter to stop...")

# python 2

#raw_input("Press Enter to stop...")

my_process1.stop()

my_process2.stop()

Save the program into a file, called ch11_07.py.

Now you can run the program.

$ python3 ch11_07.py

Program output:

p11-7

11.6 Parallel Tasks

The last section is to implement parallel tasks using concurrent.futures. There are two options to implement this: ThreadPoolExecutor and ProcessPoolExecutor.

11.6.1 ThreadPoolExecutor

ThreadPoolExecutor uses thread to do parallel tasks.

A sample of script for parallel tasks can be written the following script.

import queue

import concurrent.futures

import random

import time

import datetime

def perform(q, a, b, c):

    rand_val = random.uniform(0, 2)

    res = a * b * 10 - c * 2

    time.sleep(rand_val)

    q.put(res)

t1 = datetime.datetime.now()

q = queue.Queue()

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:

    for i in range(1, 15):

        val_a = random.randint(1, 10)

        val_b = random.randint(1, 10)

        val_c = random.randint(1, 10)

        executor.submit(perform, q, val_a, val_b, val_c)

print('Print results')

t2 = datetime.datetime.now()

while not q.empty():

    print(q.get())

t = t2 - t1

print('total time:', str(t.total_seconds()), 'seconds')

Save into a file, called ch11_08.py.

$ python3 ch11_08.py

Program output:

p11-8

11.6.2 ProcessPoolExecutor

ProcessPoolExecutor uses process to do parallel tasks.

We implement the same scenario from previous section.

import multiprocessing

import concurrent.futures

import random

import time

import datetime

def perform(q, a, b, c):

    rand_val = random.uniform(0, 2)

    res = a * b * 10 - c * 2

    time.sleep(rand_val)

    q.put(res)

t1 = datetime.datetime.now()

m = multiprocessing.Manager()

q = m.Queue()

with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:

    for i in range(1, 15):

        val_a = random.randint(1, 10)

        val_b = random.randint(1, 10)

        val_c = random.randint(1, 10)

        executor.submit(perform, q, val_a, val_b, val_c)

print('Print results')

t2 = datetime.datetime.now()

while not q.empty():

    print(q.get())

t = t2 - t1

print('total time:', str(t.total_seconds()), 'seconds')

Save these scripts into a file, called ch11_09.py.

$ python3 ch11_09.py

Program output:

p11-9