I've learned how to use a multiprocessing.Manager and dict to share results. Now can I extend it?

pull/1/head
Raymond Yee 2012-02-06 20:16:37 -08:00
parent 278701c56b
commit 9334ab0cdb
2 changed files with 97 additions and 31 deletions

View File

@ -33,8 +33,6 @@ def tripler(n):
return 3*n
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
@ -63,32 +61,17 @@ class NetTask(object):
self.n = n
def __call__(self):
print "NetTask %s" % (self.n)
global doubler_pool, negator_pool, tripler_pool
print doubler_pool, negator_pool, tripler_pool
async_results = [doubler_pool.apply_async(doubler, (self.n,)),
negator_pool.apply_async(negator, (self.n,)),
tripler_pool.apply_async(tripler, (self.n,))]
print async_results
print "NetTask about to return async_results for %s" % (self.n)
return (self.n, async_results)
results = [apply(doubler, (self.n,)),
apply(negator, (self.n,)),
apply(tripler, (self.n,))]
print results
print "NetTask about to return results for %s" % (self.n)
return (self.n, results)
#return (self.n, sum(r.get() for r in async_results))
def __str__(self):
return 'Totaler (%d)' % (self.n)
def pooler():
global doubler_pool, negator_pool, tripler_pool
TO_CALC = 10
results = []
for n in range(TO_CALC):
async_results = [doubler_pool.apply_async(doubler, (n,)), negator_pool.apply_async(negator, (n,)), tripler_pool.apply_async(tripler, (n,)),]
results.append(async_results)
for result in results:
print(sum(r.get() for r in result))
def main():
@ -96,11 +79,7 @@ def main():
tasks = multiprocessing.JoinableQueue()
results_queue = multiprocessing.Queue()
global doubler_pool, negator_pool, tripler_pool
doubler_pool = multiprocessing.Pool(1) #use one core for now
negator_pool = multiprocessing.Pool(1)
tripler_pool = multiprocessing.Pool(1)
doubler_queue = multiprocessing.JoinableQueue()
random.seed()
@ -112,9 +91,6 @@ def main():
for w in consumers:
w.start()
# demonstrate that we can do stuff with the pools
pooler()
n_tasks = 2
# create a separate process for each totaler operation

View File

@ -0,0 +1,90 @@
import multiprocessing
from functools import partial
def wrap_with_args(f):
def f1(*args, **kwargs):
r = f(*args, **kwargs)
return ((args, tuple(sorted(kwargs.items()))), r) # return hashable components
return f1
@wrap_with_args
def doubler(n):
#time.sleep(random.uniform(0,0.1))
print "in doubler %s " % (n)
return 2*n
class DoubleTask(object):
def __init__(self, n):
self.n = n
def __call__(self):
print "DoubleTask %s" % (self.n)
return (self.n, 2*self.n)
def __str__(self):
return 'DoubleTask (%d)' % (self.n)
class ConsumerWithResultDict(multiprocessing.Process):
def __init__(self, task_queue, result_dict):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_dict = result_dict
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
print '%s: %s' % (proc_name, next_task)
answer = next_task()
print "answer: ", answer
self.task_queue.task_done()
self.result_dict[answer[0]] = answer[1]
return
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
print '%s: %s' % (proc_name, next_task)
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
if __name__ == '__main__':
manager = multiprocessing.Manager()
doubler_queue = multiprocessing.JoinableQueue()
results = manager.dict()
doubler_processor = ConsumerWithResultDict(doubler_queue, results)
doubler_processor.start()
n_tasks = 10
for k in xrange(n_tasks):
doubler_queue.put(DoubleTask(k))
doubler_queue.put(None) # mark the end
doubler_queue.join()
print results