Your first attempt at making access to an iterator thread-safe seems quite reasonable. You can make it a bit more readable by using a generator:
def locked_iter(it):
it = iter(it)
lock = threading.Lock()
while True:
try:
with lock:
value = next(it)
except StopIteration:
return
yield value
Because sometimes it's more practical than designing a solution around queues, I would like to write a simple wrapper to make an iterator thread safe. So far, I had inspiration from these topics and came up with two ideas:,Your first attempt at making access to an iterator thread-safe seems quite reasonable. You can make it a bit more readable by using a generator:,This is more concise, but it can only intercept calls, and not, for example, direct property changes. (Those properties are now hidden to prevent problems.) More importantly, it makes it so that Python does no longer recognize my object as an iterator!,What is the best way of making this work for all iterators (or even better: all objects), without creating a leaky abstraction? I'm not too worried about locking when it's not necessary, but if you can come up with a solution that circumvents that, great!
Idea 1
class LockedIterator(object):
def __init__(self, it):
self._lock = threading.Lock()
self._it = it.__iter__()
if hasattr(self._it, 'close'):
def close(self):
with self._lock:
self._it.close()
self.__setattr__('close', close)
def __iter__(self):
return self
def next(self):
with self._lock:
return self._it.next()
Idea 2
class LockedIterator(object):
def __init__(self, it):
self._lock = threading.Lock()
self._it = it.__iter__()
def __getattr__(self, item):
attr = getattr(self._it, item)
if callable(attr):
def hooked( * args, ** kwargs):
with self._lock:
return attr( * args, ** kwargs)
setattr(self, item, hooked)
return hooked
Your first attempt at making access to an iterator thread-safe seems quite reasonable. You can make it a bit more readable by using a generator:
def locked_iter(it):
it = iter(it)
lock = threading.Lock()
while True:
try:
with lock:
value = next(it)
except StopIteration:
return
yield value
24 May 2012 – Bangalore
import threading def count(): i = 0 while True: i += 1 yield i class Counter: def __init__(self): self.i = 0 def __iter__(self): return self def next(self): self.i += 1 return self.i def loop(func, n): "" "Runs the given function n times in a loop. "" " for i in range(n): func() def run(f, repeats = 1000, nthreads = 10): "" "Starts multiple threads to execute the given function multiple times in each thread. "" " # create threads threads = [threading.Thread(target = loop, args = (f, repeats)) for i in range(nthreads) ] # start threads for t in threads: t.start() # wait for threads to finish for t in threads: t.join() def main(): c1 = count() c2 = Counter() # call c1.next 100 K times in 2 different threads run(c1.next, repeats = 100000, nthreads = 2) print "c1", c1.next() # call c2.next 100 K times in 2 different threads run(c2.next, repeats = 100000, nthreads = 2) print "c2", c2.next() if __name__ == "__main__": main()
Exception in thread Thread - 2:
Traceback(most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 522, in __bootstrap_inner
self.run()
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 477, in run
self.__target( * self.__args, ** self.__kwargs)
File "count.py", line 22, in loop
func()
ValueError: generator already executing
c1 112106
c2 158368
class Counter:
def __init__(self):
self.i = 0
# create a lock
self.lock = threading.Lock()
def __iter__(self):
return self
def next(self):
# acquire / release the lock when updating self.i
with self.lock:
self.i += 1
return self.i
import threading def count(): i = 0 while True: i += 1 yield i class Counter: def __init__(self): self.i = 0 def __iter__(self): return self def next(self): self.i += 1 return self.i def loop(func, n): "" "Runs the given function n times in a loop. "" " for i in range(n): func() def run(f, repeats = 1000, nthreads = 10): "" "Starts multiple threads to execute the given function multiple times in each thread. "" " # create threads threads = [threading.Thread(target = loop, args = (f, repeats)) for i in range(nthreads) ] # start threads for t in threads: t.start() # wait for threads to finish for t in threads: t.join() def main(): c1 = count() c2 = Counter() # call c1.next 100 K times in 2 different threads run(c1.next, repeats = 100000, nthreads = 2) print "c1", c1.next() # call c2.next 100 K times in 2 different threads run(c2.next, repeats = 100000, nthreads = 2) print "c2", c2.next() if __name__ == "__main__": main()
Exception in thread Thread - 2:
Traceback(most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 522, in __bootstrap_inner
self.run()
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 477, in run
self.__target( * self.__args, ** self.__kwargs)
File "count.py", line 22, in loop
func()
ValueError: generator already executing
c1 112106
c2 158368
class Counter:
def __init__(self):
self.i = 0
# create a lock
self.lock = threading.Lock()
def __iter__(self):
return self
def next(self):
# acquire / release the lock when updating self.i
with self.lock:
self.i += 1
return self.i
class threadsafe_iter: "" "Takes an iterator/generator and makes it thread-safe by serializing call to the `next` method of given iterator / generator. "" " def __init__(self, it): self.it = it self.lock = threading.Lock() def __iter__(self): return self def next(self): with self.lock: return self.it.next()
# thread unsafe generator c1 = count() # now it is thread - safe c1 = threadsafe_iter(c1)
class threadsafe_iter: "" "Takes an iterator/generator and makes it thread-safe by serializing call to the `next` method of given iterator / generator. "" " def __init__(self, it): self.it = it self.lock = threading.Lock() def __iter__(self): return self def next(self): with self.lock: return self.it.next()
# thread unsafe generator c1 = count() # now it is thread - safe c1 = threadsafe_iter(c1)
Make an iterator that returns accumulated sums, or accumulated results of other binary functions (specified via the optional func argument).,The following module functions all construct and return iterators. Some provide streams of infinite length, so they should only be accessed by functions or loops that truncate the stream.,Make an iterator that returns object over and over again. Runs indefinitely unless the times argument is specified. Used as argument to map() for invariant parameters to the called function. Also used with zip() to create an invariant part of a tuple record.,See functools.reduce() for a similar function that returns only the final accumulated value.
def accumulate(iterable, func = operator.add, *, initial = None): 'Return running totals' # accumulate([1, 2, 3, 4, 5]) -- > 1 3 6 10 15 # accumulate([1, 2, 3, 4, 5], initial = 100) -- > 100 101 103 106 110 115 # accumulate([1, 2, 3, 4, 5], operator.mul) -- > 1 2 6 24 120 it = iter(iterable) total = initial if initial is None: try: total = next(it) except StopIteration: return yield total for element in it: total = func(total, element) yield total
>>> data = [3, 4, 6, 2, 1, 9, 0, 7, 5, 8] >>> list(accumulate(data, operator.mul)) # running product[3, 12, 72, 144, 144, 1296, 0, 0, 0, 0] >>> list(accumulate(data, max)) # running maximum[3, 4, 6, 6, 6, 9, 9, 9, 9, 9] # Amortize a 5 % loan of 1000 with 4 annual payments of 90 >>> cashflows = [1000, -90, -90, -90, -90] >>> list(accumulate(cashflows, lambda bal, pmt: bal * 1.05 + pmt))[1000, 960.0, 918.0, 873.9000000000001, 827.5950000000001] # Chaotic recurrence relation https: //en.wikipedia.org/wiki/Logistic_map >>> logistic_map = lambda x, _: r * x * (1 - x) >>> r = 3.8 >>> x0 = 0.4 >>> inputs = repeat(x0, 36) # only the initial value is used >>> [format(x, '.2f') for x in accumulate(inputs, logistic_map)] ['0.40', '0.91', '0.30', '0.81', '0.60', '0.92', '0.29', '0.79', '0.63', '0.88', '0.39', '0.90', '0.33', '0.84', '0.52', '0.95', '0.18', '0.57', '0.93', '0.25', '0.71', '0.79', '0.63', '0.88', '0.39', '0.91', '0.32', '0.83', '0.54', '0.95', '0.20', '0.60', '0.91', '0.30', '0.80', '0.60' ]
def chain( * iterables):
# chain('ABC', 'DEF') -- > A B C D E F
for it in iterables:
for element in it:
yield element
def from_iterable(iterables):
# chain.from_iterable(['ABC', 'DEF']) -- > A B C D E F
for it in iterables:
for element in it:
yield element
def combinations(iterable, r):
# combinations('ABCD', 2) -- > AB AC AD BC BD CD
# combinations(range(4), 3) -- > 012 013 023 123
pool = tuple(iterable)
n = len(pool)
if r > n:
return
indices = list(range(r))
yield tuple(pool[i]
for i in indices)
while True:
for i in reversed(range(r)):
if indices[i] != i + n - r:
break
else:
return
indices[i] += 1
for j in range(i + 1, r):
indices[j] = indices[j - 1] + 1
yield tuple(pool[i]
for i in indices)
def combinations(iterable, r):
pool = tuple(iterable)
n = len(pool)
for indices in permutations(range(n), r):
if sorted(indices) == list(indices):
yield tuple(pool[i]
for i in indices)
This issue tracker has been migrated to GitHub, and is currently read-only. For more information, see the GitHub FAQs in the Python's Developer Guide.,Created on 2018-08-15 02:08 by carlorosati, last changed 2022-04-11 14:59 by admin. This issue is now closed.
Hello,
When I run the attached code, I encounter a segmentation fault.
Thanks,
Carlo
I figured out that the problem is itertools.tee does not use a multiprocessing.Manager proxied object for shared state.I was able to create a workaround tee as follows. def multiprocessing_tee(iterable, n = 2): "" "Write a multiprocessing safe itertools.tee" "" it = iter(iterable) m = multiprocessing.Manager() lists = [m.list() for i in range(n)] def gen(local_list): keep_m_alive = m while True: if not local_list: # when the local list is empty newval = next(it) # fetch a new value and for l in lists: # load it to all the lists l.append(newval) yield local_list.pop(-1) return tuple(gen(l) for l in lists)
Okay I needed to do.pop(0) instead of .pop(-1) which is probably O(N)
You 'll also need to lock when modifying the manager'
s list.Does anyone know how to do this using the multiprocessing.Queues without deadlocking ?
Davin, is there anything itertools.tee() can do about this or is this a multiprocessing issue ?
Last Updated : 21 Apr, 2022,GATE CS 2021 Syllabus
Syntax:
Iterator itr = c.iterator();
1. hasNext(): Returns true if the iteration has more elements.
public boolean hasNext();
2. next(): Returns the next element in the iteration. It throws NoSuchElementException if no more element is present.
public Object next();
In this section, we will try to understand how Java Iterator and its methods work internally. Let us take the following LinkedList object to understand this functionality.
List<String> cities = new LinkedList<>();
cities.add("G-1");
cities.add("G-2");
cities.add("G-3");
.
.
.
cities.add("G-n");
Now, let us create an Iterator object on List object as shown below:
Iterator<String> citiesIterator = cities.iterator();
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
0 1 2 3 4 5 6 7 8 9
[0, 2, 4, 6, 8]
java.util.Vector$1 java.util.Vector$Itr java.util.Vector$ListItr