multithreading problems with xmlrpclib.serverproxy under python 2.7

  • Last Update :
  • Techknowledgy :

source

import xmlrpclib
import threading

def fetch_users():
   proxy = xmlrpclib.ServerProxy("http://localhost:5000/")
print proxy.getUsers()

for _ in range(10):
   threading.Thread(target = fetch_users, args = ()).start()

# wait
for all threads to exit
for th in threading.enumerate():
   th.join()

Suggestion : 2

This issue tracker has been migrated to GitHub, and is currently read-only. For more information, see the GitHub FAQs in the Python's Developer Guide., This issue has been migrated to GitHub: https://github.com/python/cpython/issues/51698

In the past( <= 2.6) regrtest skipped a test
if any
import failure
happened, which masked various real test failures.This was fixed, and
tests that should be skipped
if certain modules are not available were
changed to use(test_) support.import_module, which causes a skip
if that
particular module cannot be imported.

If python is compiled--without - threads, then the following tests
currently 'crash'
because they cannot
import thread and / or threading:

   test_hashlib test_asyncore test_wait3 test_threading test_socket
test_wait4 test_capi test_xmlrpc test_ctypes
test_zipimport_support test_threading_local test_multiprocessing
test_file2k test_smtplib test_threadedtempfile test_threadsignals
test_thread test_queue test_asynchat test_contextlib
test_bz2 test_ftplib test_cmd test_pdb test_io test_doctest
test_sqlite test_logging test_telnetlib test_threaded_import
test_httpservers test_fork1 test_docxmlrpc test_urllib2_localnet
test_poplib test_socketserver

All of these tests should either be changed to use import_module when
importing thread / threading, or changed so that the tests requiring
thread support are skipped
if thread support is not available.

Note that test_bsddb3 also fails, but it is not an
import error crash.
The patch makes it so that tests that use threading skip rather than
generate errors when python is compiled--without - threads.

I added a skip_if_no('modulename') decorator to test_support.

Let me know
if this patch is too big to review and I 'll break it up.  It
is 800 lines.

One thing to note: when threading is disabled, all sqlite3 tests are
skipped.There are a few test files that do not use threading, but I
   couldn 't think of an easy way to split them out.

Tested on OS X and Linux.
Comments about nothreads.patch.There are unrelated changes:

   Lib / test / test_xmlrpc.py:
   -p = xmlrpclib.ServerProxy(URL, transport = t) +
   p = xmlrpclib.ServerProxy('http:', transport = t)

Lib / test / test_macostools.py(working copy) -
   try:
   -os.unlink(TESTFN2) -
   except:
   -pass

Or you should explain me why you changed that: -)

Why not using test_support.import_module('threading') in test_xmlrpc.py, test_file2k.py, etc. ?

   I don 't like importing a module in a function: you should use test_support.import_module('
threading ') in test_io.py, test_logging.py, test_capi.py, etc.

skip_if_no()
function name is a little bit too generic.I don 't have any suggestion.
In the test_xmlrpc.py
case I changed the value from URL to 'http:'
because the code that sets URL to a valid url relies on threading.When threading is disabled, URL is set to None and the test will fail.The two ServerProxy test cases that were modified in this way do not actually use the network at all.They instead test that the close() method returns None and that the transport() method returns the transport passed in to the constructor.I figured setting the url to 'http:'
   instead of
   an empty string was more readable.The reader would know that the string was supposed to be a url and that it was utterly meaningless in this
case.

In the test_macostools.py
case, the os.unlink(TESTFN2) call is a copy and paste error from the previous test.This test tried to remove an alias it never created, and it failed to check that the destination directory
for the alias actually was a directory(it only checked that the path existed - in my
   case it was a file, not a directory).I fixed the test to check that sys.prefix is a directory, and then clean up sys.prefix / TESTFN2.

The skip_if_no decorator is not absolutely necessary and could have been skipped.I believe it adds to the readability of the code because with the decorator it becomes obvious that the test should skip in some cases.Perhaps this is what import_module() is
for -
if so, should I document it ? I also believe the decorator helps prevent cases where a resource is allocated(like creating a directory), then the import_module() call fails and a test artifact is left laying around on disk.Having said that, I do not know
   if
this actually happens in any of the tests and so might be a moot point.

In reference to disliking the naming of skip_if_no(), I do not like the naming either.The decorator attempts to
   import
   the module, then raises SkipTest
if there was an ImportError.I think it is essential to have the words "import"
and "skip" in the method name to help indicate what the decorator does.These are names I could live with:

   import_or_skip_test('threading')
import_module_or_skip_test('threading')
skip_test_unless_import('threading')

My preference is
for the last one.Let me know which one you like best and I 'll change the name to that.
I haven 't reviewed the whole patch, but I would suggest that instead of making a test_support.skip_if_no (or any other name), you use use @unittest.skipUnless(threading) (note that lack of quotes around threading...you seem to already be getting it defined correctly in all the modules I scaned).  That'
s what other test modules do in similar situations.

Suggestion : 3

A ServerProxy instance is an object that manages communication with a remote XML-RPC server. The required first argument is a URI (Uniform Resource Indicator), and will normally be the URL of the server. The optional second argument is a transport factory instance; by default it is an internal SafeTransport instance for https: URLs and an internal HTTP Transport instance otherwise. The optional third argument is an encoding, by default UTF-8. The optional fourth argument is a debugging flag.,Changed in version 3.5: Instances of ServerProxy support the context manager protocol for closing the underlying transport.,Servers that support the XML introspection API support some common methods grouped under the reserved system attribute:,The encoded data will have newlines every 76 characters as per RFC 2045 section 6.8, which was the de facto standard base64 specification when the XML-RPC spec was written.

from xmlrpc.server
import SimpleXMLRPCServer

def is_even(n):
   return n % 2 == 0

server = SimpleXMLRPCServer(("localhost", 8000))
print("Listening on port 8000...")
server.register_function(is_even, "is_even")
server.serve_forever()
import xmlrpc.client

with xmlrpc.client.ServerProxy("http://localhost:8000/") as proxy:
   print("3 is even: %s" % str(proxy.is_even(3)))
print("100 is even: %s" % str(proxy.is_even(100)))
import datetime
from xmlrpc.server
import SimpleXMLRPCServer
import xmlrpc.client

def today():
   today = datetime.datetime.today()
return xmlrpc.client.DateTime(today)

server = SimpleXMLRPCServer(("localhost", 8000))
print("Listening on port 8000...")
server.register_function(today, "today")
server.serve_forever()
import xmlrpc.client
import datetime

proxy = xmlrpc.client.ServerProxy("http://localhost:8000/")

today = proxy.today()
# convert the ISO8601 string to a datetime object
converted = datetime.datetime.strptime(today.value, "%Y%m%dT%H:%M:%S")
print("Today: %s" % converted.strftime("%d.%m.%Y, %H:%M"))
from xmlrpc.server
import SimpleXMLRPCServer
import xmlrpc.client

def python_logo():
   with open("python_logo.jpg", "rb") as handle:
   return xmlrpc.client.Binary(handle.read())

server = SimpleXMLRPCServer(("localhost", 8000))
print("Listening on port 8000...")
server.register_function(python_logo, 'python_logo')

server.serve_forever()
import xmlrpc.client

proxy = xmlrpc.client.ServerProxy("http://localhost:8000/")
with open("fetched_python_logo.jpg", "wb") as handle:
   handle.write(proxy.python_logo().data)

Suggestion : 4

The multiprocessing module is easier to drop in than the threading module, as we don’t need to add a class like the Python threading example. The only changes we need to make are in the main function.,Using a concurrent.futures.ThreadPoolExecutor makes the Python threading example code almost identical to the multiprocessing module.,In the original article, I mentioned that Python’s multiprocessing module would be easier to drop into existing code than the threading module. This was because the Python 3 threading module required subclassing the Thread class and also creating a Queue for the threads to monitor for work.,Threading is one of the most well-known approaches to attaining Python concurrency and parallelism. Threading is a feature usually provided by the operating system. Threads are lighter than processes, and share the same memory space.

This is what the script looks like:

import json
import logging
import os
from pathlib
import Path
from urllib.request
import urlopen, Request

logger = logging.getLogger(__name__)

types = {
   'image/jpeg',
   'image/png'
}

def get_links(client_id):
   headers = {
      'Authorization': 'Client-ID {}'.format(client_id)
   }
req = Request('https://api.imgur.com/3/gallery/random/random/', headers = headers, method = 'GET')
with urlopen(req) as resp:
   data = json.loads(resp.read().decode('utf-8'))
return [item['link']
   for item in data['data']
   if 'type' in item and item['type'] in types
]

def download_link(directory, link):
   download_path = directory / os.path.basename(link)
with urlopen(link) as image, download_path.open('wb') as f:
   f.write(image.read())
logger.info('Downloaded %s', link)

def setup_download_dir():
   download_dir = Path('images')
if not download_dir.exists():
   download_dir.mkdir()
return download_dir

Next, we will need to write a module that will use these functions to download the images, one by one. We will name this single.py. This will contain the main function of our first, naive version of the Imgur image downloader. The module will retrieve the Imgur client ID in the environment variable IMGUR_CLIENT_ID. It will invoke the setup_download_dir to create the download destination directory. Finally, it will fetch a list of images using the get_links function, filter out all GIF and album URLs, and then use download_link to download and save each of those images to the disk. Here is what single.py looks like:

import logging
import os
from time
import time

from download
import setup_download_dir, get_links, download_link

logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
   ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
   raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
for link in links:
   download_link(download_dir, link)
logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
   main()

This is almost the same as the previous one, with the exception that we now have a new class, DownloadWorker, which is a descendent of the Python Thread class. The run method has been overridden, which runs an infinite loop. On every iteration, it calls self.queue.get() to try and fetch a URL to from a thread-safe queue. It blocks until there is an item in the queue for the worker to process. Once the worker receives an item from the queue, it then calls the same download_link method that was used in the previous script to download the image to the images directory. After the download is finished, the worker signals the queue that that task is done. This is very important, because the Queue keeps track of how many tasks were enqueued. The call to queue.join() would block the main thread forever if the workers did not signal that they completed a task.

import logging
import os
from queue
import Queue
from threading
import Thread
from time
import time

from download
import setup_download_dir, get_links, download_link

logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)

class DownloadWorker(Thread):

   def __init__(self, queue):
   Thread.__init__(self)
self.queue = queue

def run(self):
   while True:
   # Get the work from the queue and expand the tuple
directory, link = self.queue.get()
try:
download_link(directory, link)
finally:
self.queue.task_done()

def main():
   ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
   raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
# Create a queue to communicate with the worker threads
queue = Queue()
# Create 8 worker threads
for x in range(8):
   worker = DownloadWorker(queue)
# Setting daemon to True will
let the main thread exit even though the workers are blocking
worker.daemon = True
worker.start()
# Put the tasks into the queue as a tuple
for link in links:
   logger.info('Queueing {}'.format(link))
queue.put((download_dir, link))
# Causes the main thread to wait
for the queue to finish processing all the tasks
queue.join()
logging.info('Took %s', time() - ts)

if __name__ == '__main__':
   main()

One last step we need to do is to start up some workers. RQ provides a handy script to run workers on the default queue. Just run rqworker in a terminal window and it will start a worker listening on the default queue. Please make sure your current working directory is the same as where the scripts reside in. If you want to listen to a different queue, you can run rqworker queue_name and it will listen to that named queue. The great thing about RQ is that as long as you can connect to Redis, you can run as many workers as you like on as many different machines as you like; therefore, it is very easy to scale up as your application grows. Here is the source for the RQ version:

import logging
import os

from redis
import Redis

from rq
import Queue

from download
import setup_download_dir, get_links, download_link

logging.basicConfig(level = logging.DEBUG, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)

def main():
   client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
   raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
q = Queue(connection = Redis(host = 'localhost', port = 6379))
for link in links:
   q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
   main()

Using a concurrent.futures.ThreadPoolExecutor makes the Python threading example code almost identical to the multiprocessing module.

import logging
import os
from concurrent.futures
import ThreadPoolExecutor
from functools
import partial
from time
import time

from download
import setup_download_dir, get_links, download_link

logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)

def main():
   client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
   raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)

# By placing the executor inside a with block, the executors shutdown method
# will be called cleaning up threads.
#
# By
default, the executor sets number of workers to 5 times the number of
   # CPUs.
with ThreadPoolExecutor() as executor:

   # Create a new partially applied
function that stores the directory
# argument.
#
# This allows the download_link
function that normally takes two
# arguments to work with the map
function that expects a
function of a
# single argument.
fn = partial(download_link, download_dir)

# Executes fn concurrently using threads on the links iterable.The
# timeout is
for the entire process, not a single call, so downloading
# all images must complete within 30 seconds.
executor.map(fn, links, timeout = 30)

if __name__ == '__main__':
   main()