Python Cookbook 2Nd Edition Jun 1002005 [Electronic resources] نسخه متنی

اینجــــا یک کتابخانه دیجیتالی است

با بیش از 100000 منبع الکترونیکی رایگان به زبان فارسی ، عربی و انگلیسی

Python Cookbook 2Nd Edition Jun 1002005 [Electronic resources] - نسخه متنی

David Ascher, Alex Martelli, Anna Ravenscroft

| نمايش فراداده ، افزودن یک نقد و بررسی
افزودن به کتابخانه شخصی
ارسال به دوستان
جستجو در متن کتاب
بیشتر
تنظیمات قلم

فونت

اندازه قلم

+ - پیش فرض

حالت نمایش

روز نیمروز شب
جستجو در لغت نامه
بیشتر
لیست موضوعات
توضیحات
افزودن یادداشت جدید


Recipe 9.4. Working with a Thread Pool


Credit: John Nielsen, Justin A


Problem


You want your main thread to be able to
farm out processing tasks to a pool of worker threads.


Solution


The
Queue.Queue type is the simplest and most
effective way to coordinate a pool of worker threads. We could group
all the needed data structures and functions into a class, but
there's no real need to. So, here they are, shown as
globals instead:

import threading, Queue, time, sys
# Globals (start with a capital letter)
Qin = Queue.Queue( )
Qout = Queue.Queue( )
Qerr = Queue.Queue( )
Pool = [ ]
def report_error( ):
''' we "report" errors by adding error information to Qerr '''
Qerr.put(sys.exc_info( )[:2])
def get_all_from_queue(Q):
''' generator to yield one after the others all items currently
in the Queue Q, without any waiting
'''
try:
while True:
yield Q.get_nowait( )
except Queue.Empty:
raise StopIteration
def do_work_from_queue( ):
''' the get-some-work, do-some-work main loop of worker threads '''
while True:
command, item = Qin.get( ) # implicitly stops and waits
if command == 'stop':
break
try:
# simulated work functionality of a worker thread
if command == 'process':
result = 'new' + item
else:
raise ValueError, 'Unknown command %r' % command
except:
# unconditional except is right, since we report _all_ errors
report_error( )
else:
Qout.put(result)
def make_and_start_thread_pool
(number_of_threads_in_pool=5, daemons=True):
''' make a pool of N worker threads,
daemonize, and start all of them '''
for i in range(number_of_threads_in_pool):
new_thread = threading.Thread(target=do_work_from_queue)
new_thread.setDaemon(daemons)
Pool.append(new_thread)
new_thread.start( )
def request_work(data, command='process'):
''' work requests are posted as (command, data) pairs to Qin '''
Qin.put((command, data))
def get_result( ):
return Qout.get( ) # implicitly stops and waits
def show_all_results( ):
for result in get_all_from_queue(Qout):
print 'Result:', result
def show_all_errors( ):
for etyp, err in get_all_from_queue(Qerr):
print 'Error:', etyp, err
def stop_and_free_thread_pool( ):
# order is important: first, request all threads to stop...:
for i in range(len(Pool)):
request_work(None, 'stop')
# ...then, wait for each of them to terminate:
for existing_thread in Pool:
existing_thread.join( )
# clean up the pool from now-unused thread objects
del Pool[:]


Discussion


It is generally a mistake to architect a multithreading program on
the premise of having it spawn arbitrarily high numbers of threads as
needed. Most often, the best architecture for such a program is based
on farming out work to a fixed and relatively small number of
worker threadsan arrangement known as a
thread pool. This recipe shows a very simple
example of a thread pool, focusing on the use of
Queue.Queue instances as the most useful and
simplest way for inter-thread communication and synchronization.

In this recipe, worker threads run function
do_work_from_queue, which has the right structure
for a typical worker thread but does really minimal
"processing" (just as an example).
In this case, the worker thread computes a
"result" by prepending the string
'new' to each arriving item (note that this
implicitly assumes that arriving items are strings). In your
applications, of course, you will have, in the equivalent of this
do_work_from_queue function, more substantial
processing, and quite possibly different kinds of processing
depending on the value of the command parameter.

In addition to the worker threads in the pool, a multithreading
program often has other specialized threads for various purposes,
such as interfacing to various entities external to the program (a
GUI, a database, a library that is not guaranteed to be thread-safe).
In this recipe, such specialized threads are not shown. However, it
does include at least a "main
thread", which starts and stops the thread pool,
determines the units of work to be farmed out, and eventually gathers
all results and any errors that may have been reported.

In your applications, you may or may not want to start and stop the
thread pool repeatedly. Most typically, you may start the pool as a
part of your program's initialization, leave it
running throughout, and stop it, if at all, only as a part of your
program's final cleanup. If you set your worker
threads as "daemons", as this
recipe's function
make_and_start_thread_pool sets them by default, it
means that your program will not continue running when only worker
threads are left. Rather, your program will terminate as soon as the
main thread terminates. Again, this arrangement is a typically
advisable architecture. At any rate, the recipe also provides a
function stop_and_free_thread_pool, just in case you
do want to terminate and clean up your thread
pool at some point (and possibly later make and restart another one
with another call to make_and_start_thread_pool).

An example use of the functionality in this recipe might be:

for i in ('_ba', '_be', '_bo'): request_work(i)
make_and_start_thread_pool( )
stop_and_free_thread_pool( )
show_all_results( )
show_all_errors( )

The output from this snippet should normally be:

Result: new_ba
Result: new_be
Result: new_bo

although it's possible (but
quite unlikely) that two of the results might end up exchanged. (If
ordering of results is important to you, be sure to add a progressive
number to the work requests you post from the main thread, and report
it back to the main thread as part of each result or error.)

Here is a case where an error occurs and gets reported:

for i in ('_ba', 7, '_bo'): request_work(i)
make_and_start_thread_pool( )
stop_and_free_thread_pool( )
show_all_results( )
show_all_errors( )

The output from this snippet should normally be (net of an extremely
unlikely, but not impossible, exchange between the two
"Result" lines):

Result: new_ba
Result: new_bo
Error: exceptions.TypeError cannot concatenate 'str' and 'int' objects

The worker thread that gets the item 7 reports a
TypeError because it tries to concatenate the
string 'new' with this item, which is an
intan invalid operation. Not to worry: we
have the try/except statement
in function do_work_from_queue exactly to catch any
kind of error, and Queue Qerr and
functions report_error and
show_all_errors exactly to ensure that errors do not
pass silently, unless explicitly silenced, which is a key point of
Python's general approach to programming.


See Also


Library Reference docs on
tHReading and Queue modules;
Python in a Nutshell chapter on
threads.

/ 394