regluit/test/parallel.py

117 lines
3.3 KiB
Python
Raw Normal View History

import multiprocessing
import time
import sys
import random
"""
Task: for each of the 900 or so Gutenberg ids, calculate seed isbn(s)
a queue to hold the overall task of calculating the seed isbn
a results queue to hold the seed isbn (or error condition/timeout)
separate queues for GoogleBooks, FreebaseBooks, OpenLibrary, thingisbn queries:
ideally -- they have timeout, retry facility as well as the ability to sense general failure in API
ideally: caching of results
ability to persist jobs, suspend, restart
We will take this in steps by first writing toy models and filling them out
"""
def doubler(n):
#time.sleep(random.uniform(0,0.1))
2020-02-12 15:43:41 +00:00
print("in doubler %s " % (n))
return 2*n
def negator(n):
#time.sleep(random.uniform(0,0.1))
2020-02-12 15:43:41 +00:00
print("in negator %s " % (n))
return -n
def tripler(n):
#time.sleep(random.uniform(0,0.1))
2020-02-12 15:43:41 +00:00
print("in tripler %s " % (n))
return 3*n
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
2020-02-12 15:43:41 +00:00
print('%s: Exiting' % proc_name)
self.task_queue.task_done()
break
2020-02-12 15:43:41 +00:00
print('%s: %s' % (proc_name, next_task))
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class NetTask(object):
def __init__(self, n):
self.n = n
def __call__(self):
2020-02-12 15:43:41 +00:00
print("NetTask %s" % (self.n))
results = [apply(doubler, (self.n,)),
apply(negator, (self.n,)),
apply(tripler, (self.n,))]
2020-02-12 15:43:41 +00:00
print(results)
print("NetTask about to return results for %s" % (self.n))
return (self.n, results)
#return (self.n, sum(r.get() for r in async_results))
def __str__(self):
return 'Totaler (%d)' % (self.n)
def main():
# generate a queue to hold the results
tasks = multiprocessing.JoinableQueue()
results_queue = multiprocessing.Queue()
doubler_queue = multiprocessing.JoinableQueue()
random.seed()
# Start consumers
num_consumers = multiprocessing.cpu_count()
2020-02-12 15:43:41 +00:00
print('Creating %d consumers' % num_consumers)
consumers = [ Consumer(tasks, results_queue)
2020-02-17 19:02:10 +00:00
for i in range(num_consumers) ]
for w in consumers:
w.start()
n_tasks = 2
# create a separate process for each totaler operation
2020-02-17 19:02:10 +00:00
for k in range(n_tasks):
tasks.put(NetTask(k))
# Add a poison pill for each consumer
2020-02-17 19:02:10 +00:00
for i in range(num_consumers):
tasks.put(None)
# while there is an expectation of more results, read off results in the results queue
results_so_far = 0
net_results = {}
while results_so_far < n_tasks:
result = results_queue.get()
net_results[result[0]] = result[1]
2020-02-12 15:43:41 +00:00
print(result)
results_so_far += 1
2020-02-12 15:43:41 +00:00
print("net results", net_results)
if __name__ == '__main__':
main()