From 9334ab0cdbd54b8f52b244f90a896196da9693ee Mon Sep 17 00:00:00 2001 From: Raymond Yee Date: Mon, 6 Feb 2012 20:16:37 -0800 Subject: [PATCH] I've learned how to use a multiprocessing.Manager and dict to share results. Now can I extend it? --- test/parallel.py | 38 +++------------ test/parallel_with_manager.py | 90 +++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 31 deletions(-) create mode 100644 test/parallel_with_manager.py diff --git a/test/parallel.py b/test/parallel.py index 885de929..4203d690 100644 --- a/test/parallel.py +++ b/test/parallel.py @@ -33,8 +33,6 @@ def tripler(n): return 3*n - - class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): @@ -63,32 +61,17 @@ class NetTask(object): self.n = n def __call__(self): print "NetTask %s" % (self.n) - global doubler_pool, negator_pool, tripler_pool - print doubler_pool, negator_pool, tripler_pool - async_results = [doubler_pool.apply_async(doubler, (self.n,)), - negator_pool.apply_async(negator, (self.n,)), - tripler_pool.apply_async(tripler, (self.n,))] - print async_results - print "NetTask about to return async_results for %s" % (self.n) - return (self.n, async_results) + results = [apply(doubler, (self.n,)), + apply(negator, (self.n,)), + apply(tripler, (self.n,))] + print results + print "NetTask about to return results for %s" % (self.n) + return (self.n, results) #return (self.n, sum(r.get() for r in async_results)) def __str__(self): return 'Totaler (%d)' % (self.n) -def pooler(): - - global doubler_pool, negator_pool, tripler_pool - - TO_CALC = 10 - results = [] - - for n in range(TO_CALC): - async_results = [doubler_pool.apply_async(doubler, (n,)), negator_pool.apply_async(negator, (n,)), tripler_pool.apply_async(tripler, (n,)),] - results.append(async_results) - - for result in results: - print(sum(r.get() for r in result)) def main(): @@ -96,11 +79,7 @@ def main(): tasks = multiprocessing.JoinableQueue() results_queue = multiprocessing.Queue() - global doubler_pool, negator_pool, tripler_pool - - doubler_pool = multiprocessing.Pool(1) #use one core for now - negator_pool = multiprocessing.Pool(1) - tripler_pool = multiprocessing.Pool(1) + doubler_queue = multiprocessing.JoinableQueue() random.seed() @@ -111,9 +90,6 @@ def main(): for i in xrange(num_consumers) ] for w in consumers: w.start() - - # demonstrate that we can do stuff with the pools - pooler() n_tasks = 2 diff --git a/test/parallel_with_manager.py b/test/parallel_with_manager.py new file mode 100644 index 00000000..4d7017a1 --- /dev/null +++ b/test/parallel_with_manager.py @@ -0,0 +1,90 @@ +import multiprocessing +from functools import partial + +def wrap_with_args(f): + def f1(*args, **kwargs): + r = f(*args, **kwargs) + return ((args, tuple(sorted(kwargs.items()))), r) # return hashable components + return f1 + +@wrap_with_args +def doubler(n): + #time.sleep(random.uniform(0,0.1)) + print "in doubler %s " % (n) + return 2*n + +class DoubleTask(object): + def __init__(self, n): + self.n = n + def __call__(self): + print "DoubleTask %s" % (self.n) + return (self.n, 2*self.n) + def __str__(self): + return 'DoubleTask (%d)' % (self.n) + +class ConsumerWithResultDict(multiprocessing.Process): + + def __init__(self, task_queue, result_dict): + multiprocessing.Process.__init__(self) + self.task_queue = task_queue + self.result_dict = result_dict + + def run(self): + proc_name = self.name + while True: + next_task = self.task_queue.get() + if next_task is None: + # Poison pill means shutdown + print '%s: Exiting' % proc_name + self.task_queue.task_done() + break + print '%s: %s' % (proc_name, next_task) + answer = next_task() + print "answer: ", answer + self.task_queue.task_done() + self.result_dict[answer[0]] = answer[1] + return + + +class Consumer(multiprocessing.Process): + + def __init__(self, task_queue, result_queue): + multiprocessing.Process.__init__(self) + self.task_queue = task_queue + self.result_queue = result_queue + + def run(self): + proc_name = self.name + while True: + next_task = self.task_queue.get() + if next_task is None: + # Poison pill means shutdown + print '%s: Exiting' % proc_name + self.task_queue.task_done() + break + print '%s: %s' % (proc_name, next_task) + answer = next_task() + self.task_queue.task_done() + self.result_queue.put(answer) + return + +if __name__ == '__main__': + + manager = multiprocessing.Manager() + + doubler_queue = multiprocessing.JoinableQueue() + results = manager.dict() + + doubler_processor = ConsumerWithResultDict(doubler_queue, results) + doubler_processor.start() + + n_tasks = 10 + for k in xrange(n_tasks): + doubler_queue.put(DoubleTask(k)) + doubler_queue.put(None) # mark the end + + doubler_queue.join() + + print results + + \ No newline at end of file