If you want to get the counts ignoring order, use a frozenset
with Counter:
from collections
import Counter
print(Counter(map(frozenset, y)))
Using the tuples
from another answer:
In[9]: len(tuples)
Out[9]: 500000
In[10]: timeit Counter(map(frozenset, tuples))
1 loops, best of 3: 582 ms per loop
Using a frozenset will mean (1, 2)
and (2,1)
will be considered the same:
In[12]: y = [(1, 2), (2, 3), (1, 2), (5, 6), (2, 1), (6, 5)]
In[13]: from collections
import Counter
In[14]:
In[14]: print(Counter(map(frozenset, y)))
Counter({
frozenset({
1,
2
}): 3,
frozenset({
5,
6
}): 2,
frozenset({
2,
3
}): 1
})
You can follow a MapReduce approach.
from collections import Counter from multiprocessing import Pool NUM_PROCESSES = 8 y = [(1, 2), (2, 3), (1, 2), (5, 6)] * 10 # # http: //stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python def chunks(l, n): "" "Yield successive n-sized chunks from l." "" for i in xrange(0, len(l), n): yield l[i: i + n] # # map partial_counters = Pool(NUM_PROCESSES).map(Counter, chunks(y, NUM_PROCESSES)) # # reduce reduced_counter = reduce(Counter.__add__, partial_counters) # # Result is: # # Counter({ (1, 2): 20, (5, 6): 10, (2, 3): 10 })
First of all, instead of checking the membership of tup
in dict.keys
in each iteration which is a really bad idea, you can use collections.defaultdict()
for this aim which is more pythonic:
from collections
import defaultdict
test_dict = defaultdict(lambda: 1)
for tup in y:
tup = tuple(sorted(tup))
test_dict[tup] = +1
Simply refactor your code to have the processing done in a function:
def process_tuple(tuples):
count_dict = {}
for tuple_ in tuples:
tuple_ = tuple(sorted(tuple_))
if tuple_ in count_dict:
count_dict[tuple_] += 1
else:
count_dict[tuple_] = 1
return count_dict
Split tuples list into small group then use map
to process all your groups.
# # http: //stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python def chunks(l, n): "" "Yield successive n-sized chunks from l." "" for i in xrange(0, len(l), n): yield l[i: i + n] # cut tuples list into 5 chunks tuples_groups = chunks(tuples, 5) pool = Pool(5) count_dict = {} # processes chunks in parallel results = pool.map(process_tuple, tuples_groups) # collect results for result in results: count_dict.update(result)
Full example + benchmarks:
import time import random start_time = time.time() tuples = [] x, y = (100000, 10) for i in range(x): tuple_ = [] for j in range(y): tuple_.append(random.randint(0, 9)) tuples.append(tuple(tuple_)) print("--- %s data generated in %s seconds ---" % (x * y, time.time() - start_time)) def process_tuple(tuples): count_dict = {} for tuple_ in tuples: tuple_ = tuple(sorted(tuple_)) if tuple_ in count_dict: count_dict[tuple_] += 1 else: count_dict[tuple_] = 1 return count_dict from multiprocessing import Pool start_time = time.time() # # http: //stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python def chunks(l, n): "" "Yield successive n-sized chunks from l." "" for i in xrange(0, len(l), n): yield l[i: i + n] # cut tuples list into 5 chunks tuples_groups = chunks(tuples, 5) pool = Pool(5) count_dict = {} # processes chunks in parallel results = pool.map(process_tuple, tuples_groups) # collect results for result in results: count_dict.update(result) print("--- Multithread processed in %s seconds ---" % (time.time() - start_time)) start_time = time.time() count_dict = {} for tuple_ in tuples: tuple_ = tuple(sorted(tuple_)) if tuple_ in count_dict: count_dict[tuple_] += 1 else: count_dict[tuple_] = 1 print("--- Single thread processed in %s seconds ---" % (time.time() - start_time))
This module provides a class, SharedMemory, for the allocation and management of shared memory to be accessed by one or more processes on a multicore or symmetric multiprocessor (SMP) machine. To assist with the life-cycle management of shared memory especially across distinct processes, a BaseManager subclass, SharedMemoryManager, is also provided in the multiprocessing.managers module.,A subclass of BaseManager which can be used for the management of shared memory blocks across processes.,This class provides methods for creating and returning SharedMemory instances and for creating a list-like object (ShareableList) backed by shared memory.,multiprocessing.shared_memory — Shared memory for direct access across processes
>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at once
>>> buffer[4] = 100 # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5]) # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5]) # Access via shm_a
b'howdy'
>>> shm_b.close() # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink() # Call unlink only once to release the shared memory
>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8]) # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:] # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name # We did not specify a name so one was chosen for us
'psm_21467_46075'
>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([ 1, 1, 2, 3, 5, 888])
>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([ 1, 1, 2, 3, 5, 888])
>>> # Clean up from within the second Python shell
>>> del c # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()
>>> # Clean up from within the first Python shell
>>> del b # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink() # Free and release the shared memory block at the very end
>>> from multiprocessing.managers import SharedMemoryManager >>> smm = SharedMemoryManager() >>> smm.start() # Start the process that manages the shared memory blocks >>> sl = smm.ShareableList(range(4)) >>> sl ShareableList([0, 1, 2, 3], name = 'psm_6572_7512') >>> raw_shm = smm.SharedMemory(size = 128) >>> another_sl = smm.ShareableList('alpha') >>> another_sl ShareableList(['a', 'l', 'p', 'h', 'a'], name = 'psm_6572_12221') >>> smm.shutdown() # Calls unlink() on sl, raw_shm, and another_sl
>>> with SharedMemoryManager() as smm:
...sl = smm.ShareableList(range(2000))
...# Divide the work among two processes, storing partial results in sl
...p1 = Process(target = do_work, args = (sl, 0, 1000))
...p2 = Process(target = do_work, args = (sl, 1000, 2000))
...p1.start()
...p2.start() # A multiprocessing.Pool might be more efficient
...p1.join()
...p2.join() # Wait
for all work to complete in both processes
...total_result = sum(sl) # Consolidate the partial results now in sl
>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice' # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a # Use of a ShareableList after call to unlink() is unsupported
>>> b = shared_memory.ShareableList(range(5)) # In a first process >>> c = shared_memory.ShareableList(name = b.shm.name) # In a second process >>> c ShareableList([0, 1, 2, 3, 4], name = '...') >>> c[-1] = -999 >>> b[-1] - 999 >>> b.shm.close() >>> c.shm.close() >>> c.shm.unlink()
Last Updated : 18 Oct, 2021,GATE CS 2021 Syllabus
Result(in process p1): [1, 4, 9, 16]
Result(in main program): []
Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30
result = multiprocessing.Array('i', 4)
Here, we only need to specify data type. The value can be given an initial value(say 10) like this:
square_sum = multiprocessing.Value('i', 10)
p1 = multiprocessing.Process(target = square_list, args = (mylist, result, square_sum))
Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30
result = multiprocessing.Array('i', 4)
Similarly, we create a Value square_sum like this:
square_sum = multiprocessing.Value('i')
p1 = multiprocessing.Process(target = square_list, args = (mylist, result, square_sum))
square_sum is given a value by using its value attribute:
square_sum.value = sum(result)
result = multiprocessing.Array('i', 4)
Similarly, we create a Value square_sum like this:
square_sum = multiprocessing.Value('i')
Here, we only need to specify data type. The value can be given an initial value(say 10) like this:
square_sum = multiprocessing.Value('i', 10)
p1 = multiprocessing.Process(target = square_list, args = (mylist, result, square_sum))
for idx, num in enumerate(mylist): result[idx] = num * num
square_sum is given a value by using its value attribute:
square_sum.value = sum(result)
A simple way to communicate between process with multiprocessing is to use a Queue to pass messages back and forth. Any pickle-able object can pass through a Queue.,The Event class provides a simple way to communicate state information between processes. An event can be toggled between set and unset states. Users of the event object can wait for it to change from unset to set, using an optional timeout value.,As with threads, a common use pattern for multiple processes is to divide a job up among several workers to run in parallel. Effective use of multiple processes usually requires some communication between them, so that work can be divided and results can be aggregated.,This short example only passes a single message to a single worker, then the main process waits for the worker to finish.
import multiprocessing
class MyFancyClass(object):
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print 'Doing something fancy in %s for %s!' % (proc_name, self.name)
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target = worker, args = (queue, ))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
# Wait
for the worker to finish
queue.close()
queue.join_thread()
p.join()
$ python multiprocessing_queue.py Doing something fancy in Process - 1 for Fancy Dan!
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
print '%s: %s' % (proc_name, next_task)
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print 'Creating %d consumers' % num_consumers
consumers = [Consumer(tasks, results)
for i in xrange(num_consumers)
]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in xrange(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill
for each consumer
for i in xrange(num_consumers):
tasks.put(None)
# Wait
for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print 'Result:', result
num_jobs -= 1
$ python - u multiprocessing_producer_consumer.py Creating 16 consumers Consumer - 1: 0 * 0 Consumer - 2: 1 * 1 Consumer - 3: 2 * 2 Consumer - 4: 3 * 3 Consumer - 5: 4 * 4 Consumer - 6: 5 * 5 Consumer - 7: 6 * 6 Consumer - 8: 7 * 7 Consumer - 9: 8 * 8 Consumer - 10: 9 * 9 Consumer - 11: Exiting Consumer - 12: Exiting Consumer - 13: Exiting Consumer - 14: Exiting Consumer - 15: Exiting Consumer - 16: Exiting Consumer - 1: Exiting Consumer - 4: Exiting Consumer - 5: Exiting Consumer - 6: Exiting Consumer - 2: Exiting Consumer - 3: Exiting Consumer - 9: Exiting Consumer - 7: Exiting Consumer - 8: Exiting Consumer - 10: Exiting Result: 0 * 0 = 0 Result: 3 * 3 = 9 Result: 8 * 8 = 64 Result: 5 * 5 = 25 Result: 4 * 4 = 16 Result: 6 * 6 = 36 Result: 7 * 7 = 49 Result: 1 * 1 = 1 Result: 2 * 2 = 4 Result: 9 * 9 = 81
import multiprocessing
import time
def wait_for_event(e):
""
"Wait for the event to be set before doing anything"
""
print 'wait_for_event: starting'
e.wait()
print 'wait_for_event: e.is_set()->', e.is_set()
def wait_for_event_timeout(e, t):
""
"Wait t seconds and then timeout"
""
print 'wait_for_event_timeout: starting'
e.wait(t)
print 'wait_for_event_timeout: e.is_set()->', e.is_set()
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(name = 'block',
target = wait_for_event,
args = (e, ))
w1.start()
w2 = multiprocessing.Process(name = 'non-block',
target = wait_for_event_timeout,
args = (e, 2))
w2.start()
print 'main: waiting before calling Event.set()'
time.sleep(3)
e.set()
print 'main: event is set'
$ python - u multiprocessing_event.py main: waiting before calling Event.set() wait_for_event: starting wait_for_event_timeout: starting wait_for_event_timeout: e.is_set() - > False main: event is set wait_for_event: e.is_set() - > True