2 Threads Write 8 Threads Read Best Optimalisation

Annotation: By popular request that I demonstrate some alternative techniques---including async/wait, simply available since the appearance of Python 3.5---I've added some updates at the stop of the commodity. Relish!

Discussions criticizing Python often talk about how it is hard to apply Python for multithreaded work, pointing fingers at what is known as the global interpreter lock (affectionately referred to as the GIL) that prevents multiple threads of Python code from running simultaneously. Due to this, the Python multithreading module doesn't quite behave the way you lot would expect it to if you're not a Python developer and you lot are coming from other languages such every bit C++ or Java. Information technology must exist made clear that one tin can still write code in Python that runs concurrently or in parallel and brand a stark divergence in resulting performance, every bit long as certain things are taken into consideration. If you haven't read it nonetheless, I suggest y'all accept a expect at Eqbal Quran'south commodity on concurrency and parallelism in Scarlet hither on the Toptal Applied science Web log.

In this Python concurrency tutorial, we volition write a small Python script to download the top popular images from Imgur. We volition start with a version that downloads images sequentially, or one at a fourth dimension. As a prerequisite, you will take to annals an awarding on Imgur. If you do not have an Imgur account already, please create one get-go.

The scripts in these threading examples have been tested with Python iii.6.four. With some changes, they should too run with Python 2—urllib is what has changed the most betwixt these two versions of Python.

Getting Started with Python Multithreading

Let usa offset by creating a Python module, named download.py. This file will contain all the functions necessary to fetch the listing of images and download them. Nosotros will dissever these functionalities into three separate functions:

  • get_links
  • download_link
  • setup_download_dir

The third function, setup_download_dir, will be used to create a download destination directory if information technology doesn't already exist.

Imgur's API requires HTTP requests to carry the Authorization header with the client ID. You can notice this client ID from the dashboard of the application that you have registered on Imgur, and the response will exist JSON encoded. Nosotros tin can use Python'south standard JSON library to decode it. Downloading the epitome is an even simpler task, as all you have to practice is fetch the image by its URL and write it to a file.

This is what the script looks like:

          import json import logging import os from pathlib import Path from urllib.asking import urlopen, Request  logger = logging.getLogger(__name__)  types = {'image/jpeg', 'image/png'}   def get_links(client_id):     headers = {'Authorization': 'Client-ID {}'.format(client_id)}     req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='Go')     with urlopen(req) as resp:         data = json.loads(resp.read().decode('utf-viii'))     return [item['link'] for particular in data['data'] if 'blazon' in item and item['type'] in types]   def download_link(directory, link):     download_path = directory / bone.path.basename(link)     with urlopen(link) as image, download_path.open('wb') as f:         f.write(image.read())     logger.info('Downloaded %s', link)   def setup_download_dir():     download_dir = Path('images')     if not download_dir.exists():         download_dir.mkdir()     return download_dir                  

Adjacent, we will need to write a module that will employ these functions to download the images, one by one. We volition name this single.py. This will contain the primary function of our first, naive version of the Imgur prototype downloader. The module will retrieve the Imgur client ID in the environment variable IMGUR_CLIENT_ID. It will invoke the setup_download_dir to create the download destination directory. Finally, information technology will fetch a list of images using the get_links office, filter out all GIF and album URLs, and then use download_link to download and save each of those images to the disk. Here is what single.py looks similar:

          import logging import os from time import time  from download import setup_download_dir, get_links, download_link  logging.basicConfig(level=logging.INFO, format='%(asctime)due south - %(name)s - %(levelname)south - %(bulletin)southward') logger = logging.getLogger(__name__)  def principal():     ts = time()     client_id = os.getenv('IMGUR_CLIENT_ID')     if not client_id:         raise Exception("Couldn't discover IMGUR_CLIENT_ID surround variable!")     download_dir = setup_download_dir()     links = get_links(client_id)     for link in links:         download_link(download_dir, link)     logging.info('Took %s seconds', time() - ts)  if __name__ == '__main__':     main()                  

On my laptop, this script took 19.4 seconds to download 91 images. Please do note that these numbers may vary based on the network you are on. xix.iv seconds isn't terribly long, but what if nosotros wanted to download more pictures? Possibly 900 images, instead of 90. With an boilerplate of 0.two seconds per pic, 900 images would take approximately 3 minutes. For 9000 pictures information technology would take 30 minutes. The good news is that by introducing concurrency or parallelism, we tin can speed this up dramatically.

All subsequent code examples will but show import statements that are new and specific to those examples. For convenience, all of these Python scripts tin can exist establish in this GitHub repository.

Concurrency and Parallelism in Python: Threading Example

Threading is ane of the most well-known approaches to attaining Python concurrency and parallelism. Threading is a feature unremarkably provided by the operating system. Threads are lighter than processes, and share the same memory infinite.

Python multithreading memory model

In this Python threading example, we will write a new module to supercede unmarried.py. This module will create a pool of eight threads, making a full of nine threads including the main thread. I chose eight worker threads because my computer has 8 CPU cores and one worker thread per core seemed a good number for how many threads to run at once. In practice, this number is chosen much more carefully based on other factors, such as other applications and services running on the same machine.

This is almost the same as the previous one, with the exception that we now have a new class, DownloadWorker, which is a descendent of the Python Thread class. The run method has been overridden, which runs an infinite loop. On every iteration, it calls self.queue.go() to effort and fetch a URL to from a thread-prophylactic queue. Information technology blocks until at that place is an item in the queue for the worker to process. Once the worker receives an item from the queue, it then calls the same download_link method that was used in the previous script to download the paradigm to the images directory. Afterwards the download is finished, the worker signals the queue that that task is done. This is very important, because the Queue keeps rail of how many tasks were enqueued. The call to queue.bring together() would cake the chief thread forever if the workers did not signal that they completed a task.

          import logging import os from queue import Queue from threading import Thread from time import time  from download import setup_download_dir, get_links, download_link   logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')  logger = logging.getLogger(__name__)   course DownloadWorker(Thread):      def __init__(self, queue):         Thread.__init__(self)         self.queue = queue      def run(self):         while True:             # Become the work from the queue and expand the tuple             directory, link = cocky.queue.get()             try:                 download_link(directory, link)             finally:                 cocky.queue.task_done()   def master():     ts = time()     client_id = bone.getenv('IMGUR_CLIENT_ID')     if not client_id:         heighten Exception("Couldn't observe IMGUR_CLIENT_ID environment variable!")     download_dir = setup_download_dir()     links = get_links(client_id)     # Create a queue to communicate with the worker threads     queue = Queue()     # Create viii worker threads     for x in range(8):         worker = DownloadWorker(queue)         # Setting daemon to Truthful volition allow the main thread exit even though the workers are blocking         worker.daemon = True         worker.starting time()     # Put the tasks into the queue every bit a tuple     for link in links:         logger.info('Queueing {}'.format(link))         queue.put((download_dir, link))     # Causes the main thread to wait for the queue to finish processing all the tasks     queue.bring together()     logging.info('Took %s', time() - ts)  if __name__ == '__main__':     primary()                  

Running this Python threading example script on the same motorcar used earlier results in a download time of 4.1 seconds! That's 4.7 times faster than the previous case. While this is much faster, it is worth mentioning that only i thread was executing at a time throughout this process due to the GIL. Therefore, this code is concurrent just non parallel. The reason it is even so faster is because this is an IO bound task. The processor is inappreciably breaking a sweat while downloading these images, and the majority of the time is spent waiting for the network. This is why Python multithreading can provide a big speed increment. The processor can switch between the threads whenever i of them is ready to do some work. Using the threading module in Python or any other interpreted language with a GIL can actually result in reduced operation. If your code is performing a CPU leap task, such every bit decompressing gzip files, using the threading module will result in a slower execution time. For CPU spring tasks and truly parallel execution, we can use the multiprocessing module.

While the de facto reference Python implementation—CPython–has a GIL, this is not true of all Python implementations. For case, IronPython, a Python implementation using the .NET framework, does not have a GIL, and neither does Jython, the Java-based implementation. Yous can find a list of working Python implementations here.

Concurrency and Parallelism in Python Example two: Spawning Multiple Processes

The multiprocessing module is easier to drib in than the threading module, equally nosotros don't need to add together a class like the Python threading case. The simply changes nosotros need to brand are in the main function.

Python multiprocessing tutorial: Modules

To utilise multiple processes, nosotros create a multiprocessing Pool. With the map method it provides, we volition laissez passer the listing of URLs to the pool, which in plough will spawn 8 new processes and use each one to download the images in parallel. This is true parallelism, but it comes with a cost. The unabridged memory of the script is copied into each subprocess that is spawned. In this unproblematic example, information technology isn't a big deal, but it can easily go serious overhead for non-niggling programs.

          import logging import os from functools import fractional from multiprocessing.pool import Pool from time import time  from download import setup_download_dir, get_links, download_link   logging.basicConfig(level=logging.DEBUG, format='%(asctime)south - %(proper name)southward - %(levelname)south - %(message)s') logging.getLogger('requests').setLevel(logging.Disquisitional) logger = logging.getLogger(__name__)   def main():     ts = time()     client_id = os.getenv('IMGUR_CLIENT_ID')     if not client_id:         raise Exception("Couldn't observe IMGUR_CLIENT_ID environment variable!")     download_dir = setup_download_dir()     links = get_links(client_id)     download = fractional(download_link, download_dir)     with Pool(4) as p:         p.map(download, links)     logging.info('Took %southward seconds', time() - ts)   if __name__ == '__main__':     main()                  

Concurrency and Parallelism in Python Example 3: Distributing to Multiple Workers

While the threading and multiprocessing modules are cracking for scripts that are running on your personal computer, what should you practise if you want the work to be done on a different car, or you need to scale upward to more than the CPU on one machine can handle? A not bad use case for this is long-running back-end tasks for spider web applications. If yous have some long-running tasks, you don't want to spin upwards a bunch of sub-processes or threads on the same auto that need to be running the residual of your application code. This will degrade the operation of your awarding for all of your users. What would be great is to be able to run these jobs on another machine, or many other machines.

A cracking Python library for this task is RQ, a very simple yet powerful library. You offset enqueue a function and its arguments using the library. This pickles the function call representation, which is then appended to a Redis listing. Enqueueing the job is the showtime step, but volition not exercise anything yet. Nosotros also need at to the lowest degree i worker to listen on that job queue.

Model of the RQ Python queue library

The first footstep is to install and run a Redis server on your computer, or have admission to a running Redis server. Afterwards that, there are simply a few modest changes made to the existing lawmaking. We first create an instance of an RQ Queue and pass it an instance of a Redis server from the redis-py library. Then, instead of but calling our download_link method, we phone call q.enqueue(download_link, download_dir, link). The enqueue method takes a role every bit its starting time statement, and then whatsoever other arguments or keyword arguments are passed along to that role when the job is actually executed.

I terminal step we need to do is to start upward some workers. RQ provides a handy script to run workers on the default queue. Just run rqworker in a final window and it volition start a worker listening on the default queue. Please make certain your current working directory is the same every bit where the scripts reside in. If you desire to heed to a dissimilar queue, you can run rqworker queue_name and it will listen to that named queue. The great thing nigh RQ is that as long as y'all can connect to Redis, you lot can run as many workers as you like on equally many unlike machines every bit you like; therefore, it is very easy to scale up as your application grows. Here is the source for the RQ version:

          import logging import os  from redis import Redis  from rq import Queue  from download import setup_download_dir, get_links, download_link   logging.basicConfig(level=logging.DEBUG, format='%(asctime)due south - %(proper noun)s - %(levelname)south - %(bulletin)s') logging.getLogger('requests').setLevel(logging.Disquisitional) logger = logging.getLogger(__name__)   def main():     client_id = os.getenv('IMGUR_CLIENT_ID')     if not client_id:         raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")     download_dir = setup_download_dir()     links = get_links(client_id)     q = Queue(connexion=Redis(host='localhost', port=6379))     for link in links:         q.enqueue(download_link, download_dir, link)  if __name__ == '__main__':     main()                  

However, RQ is not the merely Python job queue solution. RQ is easy to utilize and covers simple use cases extremely well, simply if more advanced options are required, other Python 3 queue solutions (such as Celery) tin can be used.

Python Multithreading vs. Multiprocessing

If your code is IO bound, both multiprocessing and multithreading in Python will work for you. Multiprocessing is a easier to just drop in than threading but has a higher retentiveness overhead. If your code is CPU leap, multiprocessing is most likely going to be the improve selection—peculiarly if the target machine has multiple cores or CPUs. For web applications, and when you need to calibration the work across multiple machines, RQ is going to be better for y'all.


Update

Python concurrent.futures

Something new since Python 3.two that wasn't touched upon in the original article is the concurrent.futures package. This package provides nevertheless another manner to use concurrency and parallelism with Python.

In the original article, I mentioned that Python's multiprocessing module would be easier to drop into existing code than the threading module. This was because the Python 3 threading module required subclassing the Thread class and also creating a Queue for the threads to monitor for work.

Using a concurrent.futures.ThreadPoolExecutor makes the Python threading case code nearly identical to the multiprocessing module.

          import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time  from download import setup_download_dir, get_links, download_link  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)south')  logger = logging.getLogger(__name__)   def main():     client_id = os.getenv('IMGUR_CLIENT_ID')     if not client_id:         raise Exception("Couldn't detect IMGUR_CLIENT_ID surround variable!")     download_dir = setup_download_dir()     links = get_links(client_id)      # By placing the executor inside a with block, the executors shutdown method     # volition be chosen cleaning upwards threads.     #      # Past default, the executor sets number of workers to 5 times the number of     # CPUs.     with ThreadPoolExecutor() as executor:          # Create a new partially applied function that stores the directory         # argument.         #          # This allows the download_link office that normally takes ii         # arguments to work with the map function that expects a office of a         # single statement.         fn = partial(download_link, download_dir)          # Executes fn concurrently using threads on the links iterable. The         # timeout is for the entire procedure, not a single call, so downloading         # all images must complete within xxx seconds.         executor.map(fn, links, timeout=30)   if __name__ == '__main__':     main()                  

Now that we have all these images downloaded with our Python ThreadPoolExecutor, we can utilize them to test a CPU-bound job. We can create thumbnail versions of all the images in both a unmarried-threaded, unmarried-process script and then exam a multiprocessing-based solution.

We are going to use the Pillow library to handle the resizing of the images.

Here is our initial script.

          import logging from pathlib import Path from time import fourth dimension  from PIL import Image  logging.basicConfig(level=logging.INFO, format='%(asctime)due south - %(name)s - %(levelname)s - %(message)s')  logger = logging.getLogger(__name__)   def create_thumbnail(size, path):     """     Creates a thumbnail of an image with the same proper name as prototype but with     _thumbnail appended before the extension.  E.g.:      >>> create_thumbnail((128, 128), 'image.jpg')      A new thumbnail image is created with the proper name image_thumbnail.jpg      :param size: A tuple of the width and height of the image     :param path: The path to the image file     :return: None     """     image = Image.open(path)     paradigm.thumbnail(size)     path = Path(path)     name = path.stem + '_thumbnail' + path.suffix     thumbnail_path = path.with_name(name)     paradigm.save(thumbnail_path)   def main():     ts = time()     for image_path in Path('images').iterdir():         create_thumbnail((128, 128), image_path)     logging.info('Took %s', time() - ts)   if __name__ == '__main__':     main()                  

This script iterates over the paths in the images folder and for each path it runs the create_thumbnail function. This function uses Pillow to open the epitome, create a thumbnail, and save the new, smaller image with the aforementioned proper name equally the original but with _thumbnail appended to the name.

Running this script on 160 images totaling 36 1000000 takes 2.32 seconds. Lets encounter if nosotros tin speed this up using a ProcessPoolExecutor.

          import logging from pathlib import Path from fourth dimension import time from functools import fractional  from concurrent.futures import ProcessPoolExecutor  from PIL import Image  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)southward - %(levelname)south - %(message)s')  logger = logging.getLogger(__name__)   def create_thumbnail(size, path):     """     Creates a thumbnail of an epitome with the same name as prototype only with     _thumbnail appended before the extension. Due east.yard.:      >>> create_thumbnail((128, 128), 'paradigm.jpg')      A new thumbnail image is created with the name image_thumbnail.jpg      :param size: A tuple of the width and superlative of the image     :param path: The path to the prototype file     :return: None     """     path = Path(path)     proper noun = path.stem + '_thumbnail' + path.suffix     thumbnail_path = path.with_name(name)     image = Image.open(path)     prototype.thumbnail(size)     image.save(thumbnail_path)   def main():     ts = fourth dimension()     # Partially apply the create_thumbnail method, setting the size to 128x128     # and returning a function of a single argument.     thumbnail_128 = partial(create_thumbnail, (128, 128))      # Create the executor in a with block and then shutdown is called when the block     # is exited.     with ProcessPoolExecutor() as executor:         executor.map(thumbnail_128, Path('images').iterdir())     logging.info('Took %s', time() - ts)   if __name__ == '__main__':     main()                  

The create_thumbnail method is identical to the last script. The chief difference is the creation of a ProcessPoolExecutor. The executor's map method is used to create the thumbnails in parallel. By default, the ProcessPoolExecutor creates one subprocess per CPU. Running this script on the same 160 images took i.05 seconds—2.2 times faster!

Async/Await (Python 3.five+ merely)

I of the well-nigh requested items in the comments on the original article was for an instance using Python iii's asyncio module. Compared to the other examples, there is some new Python syntax that may be new to most people and besides some new concepts. An unfortunate boosted layer of complication is caused by Python's congenital-in urllib module not being asynchronous. We will need to use an async HTTP library to get the total benefits of asyncio. For this, nosotros'll utilise aiohttp.

Allow'south leap right into the code and a more than detailed caption will follow.

          import asyncio import logging import os from time import time  import aiohttp  from download import setup_download_dir, get_links  logging.basicConfig(level=logging.INFO, format='%(asctime)southward - %(name)south - %(levelname)s - %(bulletin)s') logger = logging.getLogger(__name__)   async def async_download_link(session, directory, link):     """     Async version of the download_link method we've been using in the other examples.     :param session: aiohttp ClientSession     :param directory: directory to relieve downloads     :param link: the url of the link to download     :render:     """     download_path = directory / os.path.basename(link)     async with session.get(link) as response:         with download_path.open('wb') as f:             while True:                 # wait pauses execution until the 1024 (or less) bytes are read from the stream                 chunk = await response.content.read(1024)                 if not chunk:                     # Nosotros are done reading the file, break out of the while loop                     interruption                 f.write(chunk)     logger.info('Downloaded %south', link)   # Main is now a coroutine async def main():     client_id = os.getenv('IMGUR_CLIENT_ID')     if not client_id:         raise Exception("Couldn't notice IMGUR_CLIENT_ID environment variable!")     download_dir = setup_download_dir()     # We use a session to take advantage of tcp keep-alive     # Ready a 3 second read and connect timeout. Default is v minutes     async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session:         tasks = [(async_download_link(session, download_dir, 50)) for l in get_links(client_id)]         # gather aggregates all the tasks and schedules them in the outcome loop         await asyncio.gather(*tasks, return_exceptions=True)   if __name__ == '__main__':     ts = time()     # Create the asyncio event loop     loop = asyncio.get_event_loop()     try:         loop.run_until_complete(main())     finally:         # Shutdown the loop even if there is an exception         loop.close()     logger.info('Took %southward seconds to complete', time() - ts)                  

There is quite a scrap to unpack hither. Let's start with the main entry signal of the plan. The first new affair we do with the asyncio module is to obtain the event loop. The effect loop handles all of the asynchronous code. Then, the loop is run until complete and passed the principal part. At that place is a piece of new syntax in the definition of primary: async def. You'll as well discover await and with async.

The async/wait syntax was introduced in PEP492. The async def syntax marks a function as a coroutine. Internally, coroutines are based on Python generators, but aren't exactly the same matter. Coroutines return a coroutine object similar to how generators render a generator object. Once you have a coroutine, you obtain its results with the await expression. When a coroutine calls wait, execution of the coroutine is suspended until the awaitable completes. This suspension allows other work to be completed while the coroutine is suspended "awaiting" some outcome. In general, this event volition be some kind of I/O like a database asking or in our case an HTTP asking.

The download_link function had to be changed pretty significantly. Previously, we were relying on urllib to do the brunt of the work of reading the prototype for us. At present, to allow our method to piece of work properly with the async programming prototype, nosotros've introduced a while loop that reads chunks of the epitome at a time and suspends execution while waiting for the I/O to complete. This allows the upshot loop to loop through downloading the dissimilar images equally each ane has new data available during the download.

At that place Should Exist One—Preferably Only One—Obvious Fashion to Exercise Information technology

While the zen of Python tells us there should be one obvious fashion to do something, in that location are many means in Python to introduce concurrency into our programs. The best method to choose is going to depend on your specific use case. The asynchronous paradigm scales ameliorate to high-concurrency workloads (like a webserver) compared to threading or multiprocessing, simply it requires your code (and dependencies) to be async in order to fully benefit.

Hopefully the Python threading examples in this commodity—and update—volition point you lot in the correct management and then you have an idea of where to look in the Python standard library if you lot need to introduce concurrency into your programs.

carranou1971.blogspot.com

Source: https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python

0 Response to "2 Threads Write 8 Threads Read Best Optimalisation"

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel