multiprocessing module for updating a shared dictionary in python

  • Last Update :
  • Techknowledgy :

If you want to get the counts ignoring order, use a frozenset with Counter:

from collections
import Counter

print(Counter(map(frozenset, y)))

Using the tuples from another answer:

In[9]: len(tuples)
Out[9]: 500000

In[10]: timeit Counter(map(frozenset, tuples))
1 loops, best of 3: 582 ms per loop

Using a frozenset will mean (1, 2) and (2,1) will be considered the same:

In[12]: y = [(1, 2), (2, 3), (1, 2), (5, 6), (2, 1), (6, 5)]

In[13]: from collections
import Counter

In[14]:

   In[14]: print(Counter(map(frozenset, y)))
Counter({
   frozenset({
      1,
      2
   }): 3,
   frozenset({
      5,
      6
   }): 2,
   frozenset({
      2,
      3
   }): 1
})

You can follow a MapReduce approach.

from collections
import Counter
from multiprocessing
import Pool

NUM_PROCESSES = 8

y = [(1, 2), (2, 3), (1, 2), (5, 6)] * 10

# # http: //stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python
   def chunks(l, n):
   ""
"Yield successive n-sized chunks from l."
""
for i in xrange(0, len(l), n):
   yield l[i: i + n]

# # map
partial_counters = Pool(NUM_PROCESSES).map(Counter, chunks(y, NUM_PROCESSES))

# # reduce
reduced_counter = reduce(Counter.__add__, partial_counters)

# # Result is:
   # # Counter({
      (1, 2): 20,
      (5, 6): 10,
      (2, 3): 10
   })

First of all, instead of checking the membership of tup in dict.keys in each iteration which is a really bad idea, you can use collections.defaultdict() for this aim which is more pythonic:

from collections
import defaultdict
test_dict = defaultdict(lambda: 1)

for tup in y:
   tup = tuple(sorted(tup))
test_dict[tup] = +1

Simply refactor your code to have the processing done in a function:

def process_tuple(tuples):
   count_dict = {}
for tuple_ in tuples:
   tuple_ = tuple(sorted(tuple_))
if tuple_ in count_dict:
   count_dict[tuple_] += 1
else:
   count_dict[tuple_] = 1
return count_dict

Split tuples list into small group then use map to process all your groups.

# # http: //stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python
   def chunks(l, n):
   ""
"Yield successive n-sized chunks from l."
""
for i in xrange(0, len(l), n):
   yield l[i: i + n]

# cut tuples list into 5 chunks
tuples_groups = chunks(tuples, 5)
pool = Pool(5)
count_dict = {}
# processes chunks in parallel
results = pool.map(process_tuple, tuples_groups)
# collect results
for result in results:
   count_dict.update(result)

Full example + benchmarks:

import time
import random

start_time = time.time()
tuples = []
x, y = (100000, 10)
for i in range(x):
   tuple_ = []
for j in range(y):
   tuple_.append(random.randint(0, 9))
tuples.append(tuple(tuple_))

print("--- %s data generated in %s seconds ---" % (x * y, time.time() - start_time))

def process_tuple(tuples):
   count_dict = {}
for tuple_ in tuples:
   tuple_ = tuple(sorted(tuple_))
if tuple_ in count_dict:
   count_dict[tuple_] += 1
else:
   count_dict[tuple_] = 1
return count_dict

from multiprocessing
import Pool

start_time = time.time()

# # http: //stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python
   def chunks(l, n):
   ""
"Yield successive n-sized chunks from l."
""
for i in xrange(0, len(l), n):
   yield l[i: i + n]

# cut tuples list into 5 chunks
tuples_groups = chunks(tuples, 5)
pool = Pool(5)
count_dict = {}
# processes chunks in parallel
results = pool.map(process_tuple, tuples_groups)
# collect results
for result in results:
   count_dict.update(result)

print("--- Multithread processed in %s seconds ---" % (time.time() - start_time))

start_time = time.time()
count_dict = {}
for tuple_ in tuples:
   tuple_ = tuple(sorted(tuple_))
if tuple_ in count_dict:
   count_dict[tuple_] += 1
else:
   count_dict[tuple_] = 1

print("--- Single thread processed in %s seconds ---" % (time.time() - start_time))

Suggestion : 2

This module provides a class, SharedMemory, for the allocation and management of shared memory to be accessed by one or more processes on a multicore or symmetric multiprocessor (SMP) machine. To assist with the life-cycle management of shared memory especially across distinct processes, a BaseManager subclass, SharedMemoryManager, is also provided in the multiprocessing.managers module.,A subclass of BaseManager which can be used for the management of shared memory blocks across processes.,This class provides methods for creating and returning SharedMemory instances and for creating a list-like object (ShareableList) backed by shared memory.,multiprocessing.shared_memory — Shared memory for direct access across processes

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
   >>> buffer = shm_a.buf
   >>> len(buffer)
   10
   >>> buffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at once
   >>> buffer[4] = 100 # Modify single byte at a time
   >>> # Attach to an existing shared memory block
   >>> shm_b = shared_memory.SharedMemory(shm_a.name)
   >>> import array
   >>> array.array('b', shm_b.buf[:5]) # Copy the data into a new array.array
   array('b', [22, 33, 44, 55, 100])
   >>> shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytes
   >>> bytes(shm_a.buf[:5]) # Access via shm_a
   b'howdy'
   >>> shm_b.close() # Close each SharedMemory instance
   >>> shm_a.close()
   >>> shm_a.unlink() # Call unlink only once to release the shared memory
>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8]) # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:] # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
   >>> type(a)
   <class 'numpy.ndarray'>
      >>> shm.name # We did not specify a name so one was chosen for us
      'psm_21467_46075'

      >>> # In either the same shell or a new Python shell on the same machine
      >>> import numpy as np
      >>> from multiprocessing import shared_memory
      >>> # Attach to the existing shared memory block
      >>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
      >>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
      >>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
      >>> c
      array([1, 1, 2, 3, 5, 8])
      >>> c[-1] = 888
      >>> c
      array([ 1, 1, 2, 3, 5, 888])

      >>> # Back in the first Python interactive shell, b reflects this change
      >>> b
      array([ 1, 1, 2, 3, 5, 888])

      >>> # Clean up from within the second Python shell
      >>> del c # Unnecessary; merely emphasizing the array is no longer used
      >>> existing_shm.close()

      >>> # Clean up from within the first Python shell
      >>> del b # Unnecessary; merely emphasizing the array is no longer used
      >>> shm.close()
      >>> shm.unlink() # Free and release the shared memory block at the very end
>>> from multiprocessing.managers
import SharedMemoryManager
   >>>
   smm = SharedMemoryManager() >>>
   smm.start() # Start the process that manages the shared memory blocks >>>
   sl = smm.ShareableList(range(4)) >>>
   sl
ShareableList([0, 1, 2, 3], name = 'psm_6572_7512') >>>
   raw_shm = smm.SharedMemory(size = 128) >>>
   another_sl = smm.ShareableList('alpha') >>>
   another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name = 'psm_6572_12221') >>>
   smm.shutdown() # Calls unlink() on sl, raw_shm, and another_sl
>>> with SharedMemoryManager() as smm:
   ...sl = smm.ShareableList(range(2000))
   ...# Divide the work among two processes, storing partial results in sl
   ...p1 = Process(target = do_work, args = (sl, 0, 1000))
   ...p2 = Process(target = do_work, args = (sl, 1000, 2000))
   ...p1.start()
   ...p2.start() # A multiprocessing.Pool might be more efficient
   ...p1.join()
   ...p2.join() # Wait
for all work to complete in both processes
   ...total_result = sum(sl) # Consolidate the partial results now in sl
>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
                     >>> a[2]
                     -273.154
                     >>> a[2] = -78.5
                     >>> a[2]
                     -78.5
                     >>> a[2] = 'dry ice' # Changing data types is supported as well
                     >>> a[2]
                     'dry ice'
                     >>> a[2] = 'larger than previously allocated storage space'
                     Traceback (most recent call last):
                     ...
                     ValueError: exceeds available storage for existing str
                     >>> a[2]
                     'dry ice'
                     >>> len(a)
                     7
                     >>> a.index(42)
                     6
                     >>> a.count(b'howdy')
                     0
                     >>> a.count(b'HoWdY')
                     1
                     >>> a.shm.close()
                     >>> a.shm.unlink()
                     >>> del a # Use of a ShareableList after call to unlink() is unsupported
>>> b = shared_memory.ShareableList(range(5)) # In a first process >>>
   c = shared_memory.ShareableList(name = b.shm.name) # In a second process >>>
   c
ShareableList([0, 1, 2, 3, 4], name = '...') >>>
   c[-1] = -999 >>>
   b[-1] -
   999 >>>
   b.shm.close() >>>
   c.shm.close() >>>
   c.shm.unlink()

Suggestion : 3

Last Updated : 18 Oct, 2021,GATE CS 2021 Syllabus

3._
Result(in process p1): [1, 4, 9, 16]
Result(in main program): []
6._
Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30
result
  result = multiprocessing.Array('i', 4)

Here, we only need to specify data type. The value can be given an initial value(say 10) like this:

  square_sum = multiprocessing.Value('i', 10)
Process
  p1 = multiprocessing.Process(target = square_list, args = (mylist, result, square_sum))
3._
Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30
result
  result = multiprocessing.Array('i', 4)

Similarly, we create a Value square_sum like this:

  square_sum = multiprocessing.Value('i')
Process
  p1 = multiprocessing.Process(target = square_list, args = (mylist, result, square_sum))

square_sum is given a value by using its value attribute:

  square_sum.value = sum(result)
result
  result = multiprocessing.Array('i', 4)

Similarly, we create a Value square_sum like this:

  square_sum = multiprocessing.Value('i')

Here, we only need to specify data type. The value can be given an initial value(say 10) like this:

  square_sum = multiprocessing.Value('i', 10)
Process
  p1 = multiprocessing.Process(target = square_list, args = (mylist, result, square_sum))
result
  for idx, num in enumerate(mylist):
     result[idx] = num * num

square_sum is given a value by using its value attribute:

  square_sum.value = sum(result)

Suggestion : 4

A simple way to communicate between process with multiprocessing is to use a Queue to pass messages back and forth. Any pickle-able object can pass through a Queue.,The Event class provides a simple way to communicate state information between processes. An event can be toggled between set and unset states. Users of the event object can wait for it to change from unset to set, using an optional timeout value.,As with threads, a common use pattern for multiple processes is to divide a job up among several workers to run in parallel. Effective use of multiple processes usually requires some communication between them, so that work can be divided and results can be aggregated.,This short example only passes a single message to a single worker, then the main process waits for the worker to finish.

import multiprocessing

class MyFancyClass(object):

   def __init__(self, name):
   self.name = name

def do_something(self):
   proc_name = multiprocessing.current_process().name
print 'Doing something fancy in %s for %s!' % (proc_name, self.name)

def worker(q):
   obj = q.get()
obj.do_something()

if __name__ == '__main__':
   queue = multiprocessing.Queue()

p = multiprocessing.Process(target = worker, args = (queue, ))
p.start()

queue.put(MyFancyClass('Fancy Dan'))

# Wait
for the worker to finish
queue.close()
queue.join_thread()
p.join()
$ python multiprocessing_queue.py

Doing something fancy in Process - 1
for Fancy Dan!
import multiprocessing
import time

class Consumer(multiprocessing.Process):

   def __init__(self, task_queue, result_queue):
   multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue

def run(self):
   proc_name = self.name
while True:
   next_task = self.task_queue.get()
if next_task is None:
   # Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
print '%s: %s' % (proc_name, next_task)
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return

class Task(object):
   def __init__(self, a, b):
   self.a = a
self.b = b
def __call__(self):
   time.sleep(0.1) # pretend to take some time to do the work
   return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
   return '%s * %s' % (self.a, self.b)

if __name__ == '__main__':
   # Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()

# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print 'Creating %d consumers' % num_consumers
consumers = [Consumer(tasks, results)
   for i in xrange(num_consumers)
]
for w in consumers:
   w.start()

# Enqueue jobs
num_jobs = 10
for i in xrange(num_jobs):
   tasks.put(Task(i, i))

# Add a poison pill
for each consumer
for i in xrange(num_consumers):
   tasks.put(None)

# Wait
for all of the tasks to finish
tasks.join()

# Start printing results
while num_jobs:
   result = results.get()
print 'Result:', result
num_jobs -= 1
$ python - u multiprocessing_producer_consumer.py

Creating 16 consumers
Consumer - 1: 0 * 0
Consumer - 2: 1 * 1
Consumer - 3: 2 * 2
Consumer - 4: 3 * 3
Consumer - 5: 4 * 4
Consumer - 6: 5 * 5
Consumer - 7: 6 * 6
Consumer - 8: 7 * 7
Consumer - 9: 8 * 8
Consumer - 10: 9 * 9
Consumer - 11: Exiting
Consumer - 12: Exiting
Consumer - 13: Exiting
Consumer - 14: Exiting
Consumer - 15: Exiting
Consumer - 16: Exiting
Consumer - 1: Exiting
Consumer - 4: Exiting
Consumer - 5: Exiting
Consumer - 6: Exiting
Consumer - 2: Exiting
Consumer - 3: Exiting
Consumer - 9: Exiting
Consumer - 7: Exiting
Consumer - 8: Exiting
Consumer - 10: Exiting
Result: 0 * 0 = 0
Result: 3 * 3 = 9
Result: 8 * 8 = 64
Result: 5 * 5 = 25
Result: 4 * 4 = 16
Result: 6 * 6 = 36
Result: 7 * 7 = 49
Result: 1 * 1 = 1
Result: 2 * 2 = 4
Result: 9 * 9 = 81
import multiprocessing
import time

def wait_for_event(e):
   ""
"Wait for the event to be set before doing anything"
""
print 'wait_for_event: starting'
e.wait()
print 'wait_for_event: e.is_set()->', e.is_set()

def wait_for_event_timeout(e, t):
   ""
"Wait t seconds and then timeout"
""
print 'wait_for_event_timeout: starting'
e.wait(t)
print 'wait_for_event_timeout: e.is_set()->', e.is_set()

if __name__ == '__main__':
   e = multiprocessing.Event()
w1 = multiprocessing.Process(name = 'block',
   target = wait_for_event,
   args = (e, ))
w1.start()

w2 = multiprocessing.Process(name = 'non-block',
   target = wait_for_event_timeout,
   args = (e, 2))
w2.start()

print 'main: waiting before calling Event.set()'
time.sleep(3)
e.set()
print 'main: event is set'
$ python - u multiprocessing_event.py

main: waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set() - > False
main: event is set
wait_for_event: e.is_set() - > True