Многопоточность в одну строку

Managing Your Threads

Python has a lot of useful methods, variables and parameter to manage your threads. These are some of the basic ones.

Naming

Setting names for threads can be quite useful for identifying them. To do this, when creating a thread call and pass a string. For example:

Now to get the name of the thread just call on the thread object. There are a few ways of getting the thread object depending on how you stored it. If you are inside the thread and you’re in a threading class, you can just use ; for example:

If you want to get it outside of the class, you can just get the variable you saved it under and use that. For example, when I saved the threads in a list before, I can just use an element from it as they are the threading objects I want. The following example will print the name of the first thread added to this list.

Joining Threads

Joining a thread allow us to wait until it is terminated before we carry on. For example:

In this example, I create the thread and start it. I then call join which waits until the thread is finished. If the thread called does not finish, the thread that called the method (can be the main thread) will wait indefinitely. This means that you shouldn’t use .join() on threads that don’t finish unless you want to stop the execution of the thread for another reason.

You can wait for many threads to finish by calling .join() after you started them all. The following example will wait until all 4 threads are finished before it goes to the print statement.

Daemon Threads

A daemon thread is a thread that will not keep running if the rest of the script has stopped and there are no more non-daemon threads left. We can tell the main thread is non-daemon as it is not abruptly stopped when it’s the only thread running.

This means setting a threads daemon value to will mean that it will not keep running after the main thread has finished (or other non-daemon threads); we can set to to make sure the thread keeps running even when the main thread finishes. The daemon value must be set before is called on the thread. To set if a thread is daemon, you can pass (or ) in the same place I put args before:

You can alternatively set a thread to be daemon using on the thread. If you are threading a class, you can call in the initialisation method or on the thread like if you were threading a method. For example:

To check if a thread is daemon, you can check on the thread; this is a bool.

Getting All Threads

If you want to get all the threads that are currently alive, you can call . This will return all alive threads including the main thread in a list. If you want to get all the names of the current alive threads, use:

Is a Thread Alive?

If you want to check if a particular thread is still alive, you can call on the thread. This will return true if the thread is still running.

Архитектура TCP-сервера

В предыдущем простом примере сервера вы успели заметить, что функция блокирует приложение. Пример банальный, рассчитан на одного клиента, и к реальной жизни имеет отдаленное отношение. Реальные серверы имеют нагрузку в несколько тысяч клиентов, и нужна более серьезная реализация. Для этого существует несколько различных подходов:

  1. использование отдельного потока на каждого клиента;
  2. использование неблокирующих сокетов;
  3. использование select/poll.

В Python неблокирующий сокет реализуется с помощью специального метода с параметром, равным нулю.

Пример:

lstn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cs = []
nc = 2
for i in range(nc):
   (clnt,ap) = lstn.accept()
   clnt.setblocking(0)
   cs.append(clnt)

Недостаток этого метода в том, что нам вручную нужно проверять готовность каждого клиента. Третий вариант с использованием позволяет переложить эту проверку на саму операционную систему. Более поздняя его вариация – функция .

Twisted

Twisted – кросс-платформенная сетевая библиотека, написанная на Python. Это асинхронный инструмент, который избавляет вас от необходимости использовать потоки. Он поддерживает работу с mail, web, news, chat, DNS, SSH, Telnet, RPC, и т.д.

Многие дистрибутивы Linux уже включают в себя twisted. Можно установить инструментарий из исходных текстов, которые лежат тут: http://twistedmatrix.com/projects/core/

В основе Twisted лежат события – event. Работой таких событий управляют специальные функции, называемые хэндлерами – event handler. Есть специальный бесконечный цикл по обработке событий – так называемый event loop. Он отлавливает события, после чего запускает соответствующие хэндлеры. После этого кошмар с последовательной обработкой событий заканчивается. За работу цикла event loop в twisted отвечает объект, называемый reactor, который находится в модуле twisted.internet . Для его запуска нужно вызвать команду:

  reactor.run()

Twisted Factory

Конфигурация поведения протокола прописывается в фабрике – классе Factory, который унаследован от twisted.internet.protocol.Factory. В программе может быть несколько протоколов, фабрика является для них организующим и конфигурационным компонентом. По умолчанию фабрика запускает каждый протокол, и устанавливает ему атрибут, называемый factory, который указывает на себя. Пример фабрики, которая позволяет протоколу писать лог-файл:

from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
 
class LoggingProtocol(LineReceiver):
    def lineReceived(self, line):
        self.factory.fp.write(line+'\n')

class LogfileFactory(Factory):
    protocol = LoggingProtocol
    def __init__(self, fileName):
        self.file = fileName
    def startFactory(self):
        self.fp = open(self.file, 'a')
    def stopFactory(self):
        self.fp.close()

What is Threading?

The threading module comes pre-installed with python so there are no downloads or installs for this tutorial. Threading allows us to call a method or class that has extended the class to run alongside the main thread (the linear flow that generally happens).

One good use of threading is to create multiple instances of things that take time outside your program. A great example of this is sending requests to a server. If you want to send many, instead of waiting for them to finish one-by-one before you send the next request, you can create many threads to request different URLs and they will then all be waiting at the same time.

Hands-On on this Python Threading Tutorial

Python threading library

Python has several ways to implement multithreading. The modern way to do it is using the library, which contains the class. As we will see, working with this library is extremely intuitive. We also want to use the library to experiment threads, but this is not strictly needed in production. Thus, at the very beginning of your script, add the following lines.

import threading
import time

Now we can start working with threads!

Define a test function to run in threads

First thing, we need to define a function we want to run in a thread. Our function will do almost nothing, but we will use the function to emulate a huge workload. The function makes your program (or thread) stop and wait for a given amount of seconds.

When you have a function that you want to run in a thread, it is best practice to use a thread identifier as the first parameter. This way, you know what thread you are running from inside the function as well. However, you also need to pass this value when creating the thread, but we will get to that. For now, just create this function.

def do_something(id, message=''):
  time.sleep(4)
  print("Thread #" + str(id) + " finished with message: " + message)

This function simply waits 4 seconds sand then prints the given message with the thread ID.

Running it synchronously “the standard way”

At this point, we can create a global list of messages that we want to print. Here is an example.

messages = 

If we were to run the function with all the messages synchronously we would roughly need 16 seconds (4 seconds per message). In fact, we can do a simple test using the function. This returns the epoch time in seconds, and we can use it twice to see how much time elapsed from the beginning to the end of the script.

start = time.time()

for i, msg in enumerate(messages):
  do_something(i, msg)

print("It took " + str(time.time()-start) + " seconds")

And here is the output. The time elapsed between printing lines, of course, was about 4 seconds.

Running it with threads

Now we can dive in the real python threading tutorial. We can rewrite this part of the script to work with threads, and distribute the load among them. Here we need to work with three different functions.

Creating the thread

To create a thread, you need to instantiate a object. The constructor wants to know a target function: the function that you want to run within the thread. It also want to know a list of parameters you want to pass to the function, if you need it. You provide the function name as , and the parameters as a tuple for the parameter. Below, a sample code to create a thread.

thread = threading.Thrad(target=function_name, args=(arg1, arg2, arg3))

From now on, you can perform operation on this thread object you just created.

Starting and joining the thread

Once you have a thread object, you can decide to start it with the function, and to join it with the function, as simple as that. The code is pretty straight forward, as you can see below.

thread.start()
thread.join()

In order to call , you need to call first. However, you don’t need to call the two one after the other. In fact, you might want to perform some code after starting the thread, and before joining it. Even more, you may not join a thread all.

The whole script

Combining the commands above, we can create a way more efficient snippet that leverages threads. Here it is.

start = time.time()
threads = []

for i, msg in enumerate(messages):
  threads.append(threading.Thread(target=do_something, args=(i, msg,)))
  threads.start()
for thread in threads:
  thread.join()

print("It took " + str(time.time()-start) + " seconds")

As you can see, we first create and start all the threads. Then, with another loop, we join all of them. We didn’t join each thread just after starting on purpose. If we did, the script would have waited for the first thread to finish before starting the second. Of course, we don’t want that. If we run this script you won’t see any output for about 4 seconds, then all four lines of the output will appear together. It will run in a little more than 4 seconds, take a look.

Synchronizing Threads¶

In addition to using , another way of synchronizing
threads is through using a object. Because the
uses a , it can be tied to a shared
resource, allowing multiple threads to wait for the resource to be
updated. In this example, the threads wait for the
to be set before continuing. The
thread is responsible for setting the condition and notifying the
other threads that they can continue.

threading_condition.py

import logging
import threading
import time


def consumer(cond):
    """wait for the condition and use the resource"""
    logging.debug('Starting consumer thread')
    with cond
        cond.wait()
        logging.debug('Resource is available to consumer')


def producer(cond):
    """set up the resource to be used by the consumer"""
    logging.debug('Starting producer thread')
    with cond
        logging.debug('Making resource available')
        cond.notifyAll()


logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer,
                      args=(condition,))
c2 = threading.Thread(name='c2', target=consumer,
                      args=(condition,))
p = threading.Thread(name='p', target=producer,
                     args=(condition,))

c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()

The threads use to acquire the lock associated with
the . Using the and
methods explicitly also works.

$ python3 threading_condition.py

2016-07-10 10:45:28,170 (c1) Starting consumer thread
2016-07-10 10:45:28,376 (c2) Starting consumer thread
2016-07-10 10:45:28,581 (p ) Starting producer thread
2016-07-10 10:45:28,581 (p ) Making resource available
2016-07-10 10:45:28,582 (c1) Resource is available to consumer
2016-07-10 10:45:28,582 (c2) Resource is available to consumer

Barriers are another thread synchronization mechanism. A
establishes a control point and all participating
threads block until all of the participating “parties” have reached
that point. It lets threads start up separately and then pause until
they are all ready to proceed.

threading_barrier.py

import threading
import time


def worker(barrier):
    print(threading.current_thread().name,
          'waiting for barrier with {} others'.format(
              barrier.n_waiting))
    worker_id = barrier.wait()
    print(threading.current_thread().name, 'after barrier',
          worker_id)


NUM_THREADS = 3

barrier = threading.Barrier(NUM_THREADS)

threads = 
    threading.Thread(
        name='worker-%s' % i,
        target=worker,
        args=(barrier,),
    )
    for i in range(NUM_THREADS)


for t in threads
    print(t.name, 'starting')
    t.start()
    time.sleep(0.1)

for t in threads
    t.join()

In this example, the is configured to block until
three threads are waiting. When the condition is met, all of the
threads are released past the control point at the same time. The
return value from indicates the number of the party being
released, and can be used to limit some threads from taking an action
like cleaning up a shared resource.

$ python3 threading_barrier.py

worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-2 after barrier 2
worker-0 after barrier 0
worker-1 after barrier 1

The method of causes all of the waiting
threads to receive a . This allows threads
to clean up if processing is stopped while they are blocked on
.

threading_barrier_abort.py

import threading
import time


def worker(barrier):
    print(threading.current_thread().name,
          'waiting for barrier with {} others'.format(
              barrier.n_waiting))
    try
        worker_id = barrier.wait()
    except threading.BrokenBarrierError
        print(threading.current_thread().name, 'aborting')
    else
        print(threading.current_thread().name, 'after barrier',
              worker_id)


NUM_THREADS = 3

barrier = threading.Barrier(NUM_THREADS + 1)

threads = 
    threading.Thread(
        name='worker-%s' % i,
        target=worker,
        args=(barrier,),
    )
    for i in range(NUM_THREADS)


for t in threads
    print(t.name, 'starting')
    t.start()
    time.sleep(0.1)

barrier.abort()

for t in threads
    t.join()

This example configures the to expect one more
participating thread than is actually started so that processing in
all of the threads is blocked. The call raises an
exception in each blocked thread.

Threading a Method

A basic example of threading can be seen below:

In this example, we create a place to store the threads and then loop 4 times as we create the threads. To create the threads, we first initialise a threading.Thread instance passing the method as the target. We then add this thread to our previously create list and start it. This will provide the following output:

Note that there are no brackets after worker when defining the target. This is because we need to pass the method, not what the method returns (even if it returns nothing).

Passing Arguments

To pass arguments to a method when creating the thread, we can pass a tuple to args. For example:

In this expanded example I have added a number parameter to which will then be used in the output. When creating the instance of threading.Thread, I pass the variable to in a tuple. This will provide the output:

The reason args is set to and not is because if we left out the comma, the type would no longer be a tuple and would cause an error.

Параллелизм и конкурентность

Питон предоставляет широкие возможности как для параллельного, так и для конкурентного программирования, однако не обходиться без особенностей.

Если вам нужен параллелизм, а это бывает когда ваши задачи требуют вычислений, то вам стоит обратить внимание на модуль. А если в ваших задачах много ожидания IO, то питон предоставляет массу вариантов на выбор, от тредов и gevent, до asyncio.
Все эти варианты выглядят вполне пригодными для использования (хотя треды значительно больше ресурсов требуют), но есть ощущение, что asyncio потихоньку выдавливает остальных, в том числе благодаря всяким плюшками типа uvloop

А если в ваших задачах много ожидания IO, то питон предоставляет массу вариантов на выбор, от тредов и gevent, до asyncio.
Все эти варианты выглядят вполне пригодными для использования (хотя треды значительно больше ресурсов требуют), но есть ощущение, что asyncio потихоньку выдавливает остальных, в том числе благодаря всяким плюшками типа uvloop.

Если кто не заметил — в питоне треды это не про параллельность, я недостаточно компетентен, чтобы хорошо рассказать про GIL, но по это теме достаточно материалов, поэтому и нет такой необходимости, главное, что нужно запомнить это то, что треды в питоне (точнее в CPython) ведут себя не так как это принято в других языках программирования — они исполняются только на одном ядре, а значит не подходят для случаев когда вам нужна настоящая параллельность, однако, выполнение тредов приостанавливается при ожидании ввода-вывода, поэтому их можно использовать для конкурентности.

Задачи с ограничением скорости вычислений и ввода-вывода

Время выполнения задач, ограниченных скоростью вычислений, полностью зависит от производительности процессора, тогда как в задачах I/O Bound скорость выполнения процесса ограничена скоростью системы ввода-вывода.

В задачах с ограничением скорости вычислений программа расходует большую часть времени на использование центрального процессора, то есть на выполнение вычислений. К таким задачам можно отнести программы, занимающиеся исключительно перемалыванием чисел и проведением расчётов.

В задачах, ограниченных скоростью ввода-вывода, программы обрабатывают большие объёмы данных с диска в сравнении с необходимым объёмом вычислений. К таким задачам можно отнести, например, подсчёт количества строк в файле.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Adblock
detector