python: yield in multiprocessing pool

  • Last Update :
  • Techknowledgy :

When a function containing a yield statement is called, it doesn't actually run the code but returns a generator instead:

>>> p = parallel(1, 2, 3)
>>> p
<generator object parallel at 0x7fde9c1daf00>

Then, when the next value is required, the code will run until a value is yielded:

>>> next(p)
   ([10000], 6) >>>
   next(p)
   (6, [10000])

If you want to use a generator, you could change your code a bit to target a function that creates a list from the generator:

def parallel2(x, y, z):
   return list(parallel(x, y, z))

def collect_results(lst):
   results.extend(lst)

def apply_async_with_callback():
   pool = mp.Pool()
for _ in range(10):
   pool.apply_async(parallel2, args = (2, 5, 7),
      callback = collect_results)

Suggestion : 2

A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.,A thread pool object which controls a pool of worker threads to which jobs can be submitted. ThreadPool instances are fully interface compatible with Pool instances, and their resources must also be properly managed, either by using the pool as a context manager or by calling close() and terminate() manually.,In particular, the Pool function provided by multiprocessing.dummy returns an instance of ThreadPool, which is a subclass of Pool that supports all the same method calls but uses a pool of worker threads rather than worker processes.,The Pool class represents a pool of worker processes. It has methods which allows tasks to be offloaded to the worker processes in a few different ways.

from multiprocessing
import Pool

def f(x):
   return x * x

if __name__ == '__main__':
   with Pool(5) as p:
   print(p.map(f, [1, 2, 3]))
[1, 4, 9]
from multiprocessing
import Process

def f(name):
   print('hello', name)

if __name__ == '__main__':
   p = Process(target = f, args = ('bob', ))
p.start()
p.join()
from multiprocessing
import Process
import os

def info(title):
   print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())

def f(name):
   info('function f')
print('hello', name)

if __name__ == '__main__':
   info('main line')
p = Process(target = f, args = ('bob', ))
p.start()
p.join()
import multiprocessing as mp

def foo(q):
   q.put('hello')

if __name__ == '__main__':
   mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()
import multiprocessing as mp

def foo(q):
   q.put('hello')

if __name__ == '__main__':
   ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target = foo, args = (q, ))
p.start()
print(q.get())
p.join()

Suggestion : 3

This is what stops the task handler from fully consuming your generator, g(). If you look in Pool._handle_tasks you'll see,That the main thread is waiting instead of calling _exit_function is what allows the task handler thread to run normally, which means fully consuming the generator as it puts tasks in the workers' inqueue in the Pool._handle_tasks function.,The bottom line is that all Pool map functions consume the entire iterable that it is given. If you'd like to consume the generator in chunks, you could do this instead:,This solution has the advantage of not batching requests to Pool.map. One individual worker can not block others from making progress. YMMV. Note that you may want to use a different object to signal termination for the workers. In the example, I've used None.

The _exit_function eventually calls Pool._terminate_pool. The main thread changes the state of pool._task_handler._state from RUN to TERMINATE. Meanwhile the pool._task_handler thread is looping in Pool._handle_tasks and bails out when it reaches the condition

            if thread._state: debug('task handler found thread._state != RUN') break

This is what stops the task handler from fully consuming your generator, g(). If you look in Pool._handle_tasks you'll see

        for i, task in enumerate(taskseq): ...
           try: put(task) except IOError: debug('could not put task on queue') break

The bottom line is that all Pool map functions consume the entire iterable that it is given. If you'd like to consume the generator in chunks, you could do this instead:

import multiprocessing as mp
import itertools
import time def g(): for el in xrange(50): print el yield el def f(x): time.sleep(1) return x * x
if __name__ == '__main__': pool = mp.Pool(processes = 4) # start 4 worker processes go = g() result = [] N = 11
while True: g2 = pool.map(f, itertools.islice(go, N)) if g2: result.extend(g2) time.sleep(1)
else: break print(result)

I had this problem too and was disappointed to learn that map consumes all its elements. I coded a function which consumes the iterator lazily using the Queue data type in multiprocessing. This is similar to what @unutbu describes in a comment to his answer but as he points out, suffers from having no callback mechanism for re-loading the Queue. The Queue datatype instead exposes a timeout parameter and I've used 100 milliseconds to good effect.

from multiprocessing
import Process, Queue, cpu_count from Queue
import Full as QueueFull from Queue
import Empty as QueueEmpty def worker(recvq, sendq): for func, args in iter(recvq.get, None): result = func( * args) sendq.put(result) def pool_imap_unordered(function, iterable, procs = cpu_count()): # Create queues
for sending / receiving items from iterable.sendq = Queue(procs) recvq = Queue() # Start worker processes.for rpt in xrange(procs): Process(target = worker, args = (sendq, recvq)).start() # Iterate iterable and communicate with worker processes.send_len = 0 recv_len = 0 itr = iter(iterable) try: value = itr.next() while True: try: sendq.put((function, value), True, 0.1) send_len += 1 value = itr.next() except QueueFull: while True: try: result = recvq.get(False) recv_len += 1 yield result except QueueEmpty: break except StopIteration: pass # Collect all remaining results.while recv_len < send_len: result = recvq.get() recv_len += 1 yield result # Terminate worker processes.for rpt in xrange(procs): sendq.put(None)

64 k - default socket buffer size.

import itertools from multiprocessing
import Pool from time
import sleep def f(x): print("f()") sleep(3) return x def get_reader(): for x in range(10): print("readed: ", x) value = " " * 1024 * 64 # 64 k yield value
if __name__ == '__main__': p = Pool(processes = 2) data = p.imap(f, get_reader()) p.close() p.join()

Suggestion : 4

Android Studio : The process cannot access the file because it is being used by another process (Jetpack Compose),PS: The problem is since its an generator I anycodings_parallel-processing cannot load all the files and use something anycodings_parallel-processing like map to run the processes. ,This is my code where actual processing anycodings_parallel-processing happens,To make it faster, we have to execute anycodings_python the slow generator with multiple anycodings_python processes. The modified code look like :

I'm trying to process a file(every line is a anycodings_parallel-processing json document). The size of the file can go anycodings_parallel-processing up to 100's of mbs to gb's. So I wrote a anycodings_parallel-processing generator code to fetch each document line anycodings_parallel-processing by line from file.

def jl_file_iterator(file):
   with codecs.open(file, 'r', 'utf-8') as f:
   for line in f:
   document = json.loads(line)
yield document

My system has 4 cores, So I would like to anycodings_parallel-processing process 4 lines of the file in parallel. anycodings_parallel-processing Currently I have this code which takes 4 anycodings_parallel-processing lines at a time and calls the code for anycodings_parallel-processing parallel processing

threads = 4
files, i = [], 1
for jl in jl_file_iterator(input_path):
   files.append(jl)
if i % (threads) == 0:
   # pool.map(processFile, files)
parallelProcess(files, o)
files = []
i += 1

if files:
   parallelProcess(files, o)
files = []

This is my code where actual processing anycodings_parallel-processing happens

def parallelProcess(files, outfile):
   processes = []
for i in range(len(files)):
   p = Process(target = processFile, args = (files[i], ))
processes.append(p)
p.start()
for i in range(len(files)):
   processes[i].join()

def processFile(doc):
   extractors = {}
   ...do some processing on doc
   o.write(json.dumps(doc) + '\n')

In your specific case, it would probably anycodings_python make sense to pass lines to the anycodings_python consumers and let them do the document = anycodings_python json.loads(line) part in parallel.

import multiprocessing as mp

NCORE = 4

def process(q, iolock):
   from time
import sleep
while True:
   stuff = q.get()
if stuff is None:
   break
with iolock:
   print("processing", stuff)
sleep(stuff)

if __name__ == '__main__':
   q = mp.Queue(maxsize = NCORE)
iolock = mp.Lock()
pool = mp.Pool(NCORE, initializer = process, initargs = (q, iolock))
for stuff in range(20):
   q.put(stuff) # blocks until q below its max size
with iolock:
   print("queued", stuff)
for _ in range(NCORE): # tell workers we 're done
q.put(None)
pool.close()
pool.join()

So I ended up running this succesfully. anycodings_python By creating chunks of lines from my file anycodings_python and running the lines parallely. Posting anycodings_python it here so it can be useful to somebody anycodings_python in future.

def run_parallel(self, processes = 4):
   processes = int(processes)
pool = mp.Pool(processes)
try:
pool = mp.Pool(processes)
jobs = []
# run
for chunks of files
for chunkStart, chunkSize in self.chunkify(input_path):
   jobs.append(pool.apply_async(self.process_wrapper, (chunkStart, chunkSize)))
for job in jobs:
   job.get()
pool.close()
except Exception as e:
   print e

def process_wrapper(self, chunkStart, chunkSize):
   with open(self.input_file) as f:
   f.seek(chunkStart)
lines = f.read(chunkSize).splitlines()
for line in lines:
   document = json.loads(line)
self.process_file(document)

# Splitting data into chunks
for parallel processing
def chunkify(self, filename, size = 1024 * 1024):
   fileEnd = os.path.getsize(filename)
with open(filename, 'r') as f:
   chunkEnd = f.tell()
while True:
   chunkStart = chunkEnd
f.seek(size, 1)
f.readline()
chunkEnd = f.tell()
yield chunkStart, chunkEnd - chunkStart
if chunkEnd > fileEnd:
   break

Something like this :

def fast_generator1():
   for line in file:
   yield line

def slow_generator(lines):
   for line in lines:
   yield heavy_processing(line)

def fast_generator2():
   for line in lines:
   yield fast_func(line)

if __name__ == "__main__":
   lines = fast_generator1()
lines = slow_generator(lines)
lines = fast_generator2(lines)
for line in lines:
   print(line)

To make it faster, we have to execute anycodings_python the slow generator with multiple anycodings_python processes. The modified code look like :

import multiprocessing as mp

NCORE = 4

def fast_generator1():
   for line in file:
   yield line

def slow_generator(lines):
   def gen_to_queue(input_q, lines):
   # This
function simply consume our generator and write it to the input queue
for line in lines:
   input_q.put(line)
for _ in range(NCORE): # Once generator is consumed, send end - signal
input_q.put(None)

def process(input_q, output_q):
   while True:
   line = input_q.get()
if line is None:
   output_q.put(None)
break
output_q.put(heavy_processing(line))

input_q = mp.Queue(maxsize = NCORE * 2)
output_q = mp.Queue(maxsize = NCORE * 2)

# Here we need 3 groups of worker:
   # * One that will consume the input generator and put it into a queue.It will be `gen_pool`.It 's ok to have only 1 process doing this, since this is a very light task
# * One that do the main processing.It will be `pool`.
   # * One that read the results and yield it back, to keep it as a generator.The main thread will do it.
      gen_pool = mp.Pool(1, initializer = gen_to_queue, initargs = (input_q, lines))
pool = mp.Pool(NCORE, initializer = process, initargs = (input_q, output_q))

finished_workers = 0
while True:
   line = output_q.get()
if line is None:
   finished_workers += 1
if finished_workers == NCORE:
   break
else:
   yield line

def fast_generator2():
   for line in lines:
   yield fast_func(line)

if __name__ == "__main__":
   lines = fast_generator1()
lines = slow_generator(lines)
lines = fast_generator2(lines)
for line in lines:
   print(line)

You can also use the higher-level anycodings_python concurrent.futures module like so:

import concurrent.futures as cf
import time
from concurrent.futures
import ProcessPoolExecutor

def read(infile):
   with open(infile, "r") as f:
   for line in f:
   yield line.strip()

def process(line):
   # Simulate doing some heavy processing on `line`
time.sleep(3)
return line.upper()

def run_parallel(num_workers, lines):
   with ProcessPoolExecutor(max_workers = num_workers) as p:
   futures = {
      p.submit(process, line) for line in lines
   }
for future in cf.as_completed(futures):
   yield future.result()

def write(outfile, lines):
   with open(outfile, "w") as f:
   for line in lines:
   f.write(line + "\n")

NUM_WORKERS = 4

if __name__ == "__main__":
   start = time.time()
lines = reader("infile.txt")
lines = run_parallel(NUM_WORKERS, lines)
write("outfile.txt", lines)
print(time.time() - start)

Input file:

a
b
c
d
e
f
g
h
i
j

Output file:

A
F
B
G
E
D
C
H
I
J

Suggestion : 5

Let's say we want to run a function over each item in an iterable. Let's just do:,Pool allows us to create a pool of worker processes,Welcome to part 11 of the intermediate Python programming tutorial series. In this part, we're going to talk more about the built-in library: multiprocessing., Getting Values from Multiprocessing Processes

To begin, we're going to import Pool

from multiprocessing
import Pool

Let's say we want to run a function over each item in an iterable. Let's just do:

def job(num):
   return num * 2

Simple enough, now let's set up the processes:

if __name__ == '__main__':
   p = Pool(processes = 20)
data = p.map(job, [i
   for i in range(20)
])
p.close()
print(data)

Suggestion : 6

last modified July 29, 2022

1._
#!/usr/bin/python

from multiprocessing
import Process

def fun(name):
   print(f 'hello {name}')

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()

if __name__ == '__main__':
   main()

We create a new process and pass a value to it.

def fun(name):
   print(f 'hello {name}')

The function prints the passed parameter.

def main():

   p = Process(target = fun, args = ('Peter', ))
p.start()
5._
#!/usr/bin/python

from multiprocessing
import Process
import time

def fun():

   print('starting fun')
time.sleep(2)
print('finishing fun')

def main():

   p = Process(target = fun)
p.start()
p.join()

if __name__ == '__main__':

   print('starting main')
main()
print('finishing main')

The example calls the join on the newly created process.

$. / joining.py
starting main
starting fun
finishing fun
finishing main