When a function containing a yield
statement is called, it doesn't actually run the code but returns a generator instead:
>>> p = parallel(1, 2, 3)
>>> p
<generator object parallel at 0x7fde9c1daf00>
Then, when the next value is required, the code will run until a value is yielded:
>>> next(p)
([10000], 6) >>>
next(p)
(6, [10000])
If you want to use a generator, you could change your code a bit to target a function that creates a list from the generator:
def parallel2(x, y, z):
return list(parallel(x, y, z))
def collect_results(lst):
results.extend(lst)
def apply_async_with_callback():
pool = mp.Pool()
for _ in range(10):
pool.apply_async(parallel2, args = (2, 5, 7),
callback = collect_results)
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.,A thread pool object which controls a pool of worker threads to which jobs can be submitted. ThreadPool instances are fully interface compatible with Pool instances, and their resources must also be properly managed, either by using the pool as a context manager or by calling close() and terminate() manually.,In particular, the Pool function provided by multiprocessing.dummy returns an instance of ThreadPool, which is a subclass of Pool that supports all the same method calls but uses a pool of worker threads rather than worker processes.,The Pool class represents a pool of worker processes. It has methods which allows tasks to be offloaded to the worker processes in a few different ways.
from multiprocessing
import Pool
def f(x):
return x * x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
[1, 4, 9]
from multiprocessing
import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
from multiprocessing
import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
This is what stops the task handler from fully consuming your generator, g(). If you look in Pool._handle_tasks you'll see,That the main thread is waiting instead of calling _exit_function is what allows the task handler thread to run normally, which means fully consuming the generator as it puts tasks in the workers' inqueue in the Pool._handle_tasks function.,The bottom line is that all Pool map functions consume the entire iterable that it is given. If you'd like to consume the generator in chunks, you could do this instead:,This solution has the advantage of not batching requests to Pool.map. One individual worker can not block others from making progress. YMMV. Note that you may want to use a different object to signal termination for the workers. In the example, I've used None.
The _exit_function
eventually calls Pool._terminate_pool
. The main thread changes the state of pool._task_handler._state
from RUN
to TERMINATE
. Meanwhile the pool._task_handler
thread is looping in Pool._handle_tasks
and bails out when it reaches the condition
if thread._state: debug('task handler found thread._state != RUN') break
This is what stops the task handler from fully consuming your generator, g()
. If you look in Pool._handle_tasks
you'll see
for i, task in enumerate(taskseq): ...
try: put(task) except IOError: debug('could not put task on queue') break
The bottom line is that all Pool
map functions consume the entire iterable that it is given. If you'd like to consume the generator in chunks, you could do this instead:
import multiprocessing as mp
import itertools
import time def g(): for el in xrange(50): print el yield el def f(x): time.sleep(1) return x * x
if __name__ == '__main__': pool = mp.Pool(processes = 4) # start 4 worker processes go = g() result = [] N = 11
while True: g2 = pool.map(f, itertools.islice(go, N)) if g2: result.extend(g2) time.sleep(1)
else: break print(result)
I had this problem too and was disappointed to learn that map consumes all its elements. I coded a function which consumes the iterator lazily using the Queue data type in multiprocessing. This is similar to what @unutbu describes in a comment to his answer but as he points out, suffers from having no callback mechanism for re-loading the Queue. The Queue datatype instead exposes a timeout parameter and I've used 100 milliseconds to good effect.
from multiprocessing
import Process, Queue, cpu_count from Queue
import Full as QueueFull from Queue
import Empty as QueueEmpty def worker(recvq, sendq): for func, args in iter(recvq.get, None): result = func( * args) sendq.put(result) def pool_imap_unordered(function, iterable, procs = cpu_count()): # Create queues
for sending / receiving items from iterable.sendq = Queue(procs) recvq = Queue() # Start worker processes.for rpt in xrange(procs): Process(target = worker, args = (sendq, recvq)).start() # Iterate iterable and communicate with worker processes.send_len = 0 recv_len = 0 itr = iter(iterable) try: value = itr.next() while True: try: sendq.put((function, value), True, 0.1) send_len += 1 value = itr.next() except QueueFull: while True: try: result = recvq.get(False) recv_len += 1 yield result except QueueEmpty: break except StopIteration: pass # Collect all remaining results.while recv_len < send_len: result = recvq.get() recv_len += 1 yield result # Terminate worker processes.for rpt in xrange(procs): sendq.put(None)
64 k - default socket buffer size.
import itertools from multiprocessing
import Pool from time
import sleep def f(x): print("f()") sleep(3) return x def get_reader(): for x in range(10): print("readed: ", x) value = " " * 1024 * 64 # 64 k yield value
if __name__ == '__main__': p = Pool(processes = 2) data = p.imap(f, get_reader()) p.close() p.join()
Android Studio : The process cannot access the file because it is being used by another process (Jetpack Compose),PS: The problem is since its an generator I anycodings_parallel-processing cannot load all the files and use something anycodings_parallel-processing like map to run the processes. ,This is my code where actual processing anycodings_parallel-processing happens,To make it faster, we have to execute anycodings_python the slow generator with multiple anycodings_python processes. The modified code look like :
I'm trying to process a file(every line is a anycodings_parallel-processing json document). The size of the file can go anycodings_parallel-processing up to 100's of mbs to gb's. So I wrote a anycodings_parallel-processing generator code to fetch each document line anycodings_parallel-processing by line from file.
def jl_file_iterator(file):
with codecs.open(file, 'r', 'utf-8') as f:
for line in f:
document = json.loads(line)
yield document
My system has 4 cores, So I would like to anycodings_parallel-processing process 4 lines of the file in parallel. anycodings_parallel-processing Currently I have this code which takes 4 anycodings_parallel-processing lines at a time and calls the code for anycodings_parallel-processing parallel processing
threads = 4 files, i = [], 1 for jl in jl_file_iterator(input_path): files.append(jl) if i % (threads) == 0: # pool.map(processFile, files) parallelProcess(files, o) files = [] i += 1 if files: parallelProcess(files, o) files = []
This is my code where actual processing anycodings_parallel-processing happens
def parallelProcess(files, outfile):
processes = []
for i in range(len(files)):
p = Process(target = processFile, args = (files[i], ))
processes.append(p)
p.start()
for i in range(len(files)):
processes[i].join()
def processFile(doc):
extractors = {}
...do some processing on doc
o.write(json.dumps(doc) + '\n')
In your specific case, it would probably anycodings_python make sense to pass lines to the anycodings_python consumers and let them do the document = anycodings_python json.loads(line) part in parallel.
import multiprocessing as mp NCORE = 4 def process(q, iolock): from time import sleep while True: stuff = q.get() if stuff is None: break with iolock: print("processing", stuff) sleep(stuff) if __name__ == '__main__': q = mp.Queue(maxsize = NCORE) iolock = mp.Lock() pool = mp.Pool(NCORE, initializer = process, initargs = (q, iolock)) for stuff in range(20): q.put(stuff) # blocks until q below its max size with iolock: print("queued", stuff) for _ in range(NCORE): # tell workers we 're done q.put(None) pool.close() pool.join()
So I ended up running this succesfully. anycodings_python By creating chunks of lines from my file anycodings_python and running the lines parallely. Posting anycodings_python it here so it can be useful to somebody anycodings_python in future.
def run_parallel(self, processes = 4): processes = int(processes) pool = mp.Pool(processes) try: pool = mp.Pool(processes) jobs = [] # run for chunks of files for chunkStart, chunkSize in self.chunkify(input_path): jobs.append(pool.apply_async(self.process_wrapper, (chunkStart, chunkSize))) for job in jobs: job.get() pool.close() except Exception as e: print e def process_wrapper(self, chunkStart, chunkSize): with open(self.input_file) as f: f.seek(chunkStart) lines = f.read(chunkSize).splitlines() for line in lines: document = json.loads(line) self.process_file(document) # Splitting data into chunks for parallel processing def chunkify(self, filename, size = 1024 * 1024): fileEnd = os.path.getsize(filename) with open(filename, 'r') as f: chunkEnd = f.tell() while True: chunkStart = chunkEnd f.seek(size, 1) f.readline() chunkEnd = f.tell() yield chunkStart, chunkEnd - chunkStart if chunkEnd > fileEnd: break
Something like this :
def fast_generator1():
for line in file:
yield line
def slow_generator(lines):
for line in lines:
yield heavy_processing(line)
def fast_generator2():
for line in lines:
yield fast_func(line)
if __name__ == "__main__":
lines = fast_generator1()
lines = slow_generator(lines)
lines = fast_generator2(lines)
for line in lines:
print(line)
To make it faster, we have to execute anycodings_python the slow generator with multiple anycodings_python processes. The modified code look like :
import multiprocessing as mp NCORE = 4 def fast_generator1(): for line in file: yield line def slow_generator(lines): def gen_to_queue(input_q, lines): # This function simply consume our generator and write it to the input queue for line in lines: input_q.put(line) for _ in range(NCORE): # Once generator is consumed, send end - signal input_q.put(None) def process(input_q, output_q): while True: line = input_q.get() if line is None: output_q.put(None) break output_q.put(heavy_processing(line)) input_q = mp.Queue(maxsize = NCORE * 2) output_q = mp.Queue(maxsize = NCORE * 2) # Here we need 3 groups of worker: # * One that will consume the input generator and put it into a queue.It will be `gen_pool`.It 's ok to have only 1 process doing this, since this is a very light task # * One that do the main processing.It will be `pool`. # * One that read the results and yield it back, to keep it as a generator.The main thread will do it. gen_pool = mp.Pool(1, initializer = gen_to_queue, initargs = (input_q, lines)) pool = mp.Pool(NCORE, initializer = process, initargs = (input_q, output_q)) finished_workers = 0 while True: line = output_q.get() if line is None: finished_workers += 1 if finished_workers == NCORE: break else: yield line def fast_generator2(): for line in lines: yield fast_func(line) if __name__ == "__main__": lines = fast_generator1() lines = slow_generator(lines) lines = fast_generator2(lines) for line in lines: print(line)
You can also use the higher-level anycodings_python concurrent.futures module like so:
import concurrent.futures as cf
import time
from concurrent.futures
import ProcessPoolExecutor
def read(infile):
with open(infile, "r") as f:
for line in f:
yield line.strip()
def process(line):
# Simulate doing some heavy processing on `line`
time.sleep(3)
return line.upper()
def run_parallel(num_workers, lines):
with ProcessPoolExecutor(max_workers = num_workers) as p:
futures = {
p.submit(process, line) for line in lines
}
for future in cf.as_completed(futures):
yield future.result()
def write(outfile, lines):
with open(outfile, "w") as f:
for line in lines:
f.write(line + "\n")
NUM_WORKERS = 4
if __name__ == "__main__":
start = time.time()
lines = reader("infile.txt")
lines = run_parallel(NUM_WORKERS, lines)
write("outfile.txt", lines)
print(time.time() - start)
Input file:
a
b
c
d
e
f
g
h
i
j
Output file:
A
F
B
G
E
D
C
H
I
J
Let's say we want to run a function over each item in an iterable. Let's just do:,Pool allows us to create a pool of worker processes,Welcome to part 11 of the intermediate Python programming tutorial series. In this part, we're going to talk more about the built-in library: multiprocessing., Getting Values from Multiprocessing Processes
To begin, we're going to import Pool
from multiprocessing
import Pool
Let's say we want to run a function over each item in an iterable. Let's just do:
def job(num):
return num * 2
Simple enough, now let's set up the processes:
if __name__ == '__main__':
p = Pool(processes = 20)
data = p.map(job, [i
for i in range(20)
])
p.close()
print(data)
last modified July 29, 2022
#!/usr/bin/python
from multiprocessing
import Process
def fun(name):
print(f 'hello {name}')
def main():
p = Process(target = fun, args = ('Peter', ))
p.start()
if __name__ == '__main__':
main()
We create a new process and pass a value to it.
def fun(name):
print(f 'hello {name}')
The function prints the passed parameter.
def main():
p = Process(target = fun, args = ('Peter', ))
p.start()
#!/usr/bin/python
from multiprocessing
import Process
import time
def fun():
print('starting fun')
time.sleep(2)
print('finishing fun')
def main():
p = Process(target = fun)
p.start()
p.join()
if __name__ == '__main__':
print('starting main')
main()
print('finishing main')
The example calls the join
on the newly created process.
$. / joining.py
starting main
starting fun
finishing fun
finishing main