Внутри процесса: многопоточность и пинг-понг mutex’ом
Содержание:
Как работают потоки в питоне
Если нужно, чтобы ваше приложение выполняло несколько задач в одно и то же время, то можете воспользоваться потоками (threads). Потоки позволяют приложениям выполнять в одно и то же время множество задач. Многопоточность (multi-threading) важна во множестве приложений, от примитивных серверов до современных сложных и ресурсоёмких игр.
Когда в одной программе работают несколько потоков, возникает проблема разграничения доступа потоков к общим данным. Предположим, что есть два потока, имеющих доступ к общему списку. Первый поток может делать итерацию по этому списку:
for x in L
а второй в этот момент начнет удалять значения из этого списка. Тут может произойти все что угодно: программа может упасть, или мы просто получим неверные данные.
Решением в этом случае является применение блокировки. При этом доступ к заблокированному списку будет иметь только один поток, второй будет ждать, пока блокировка не будет снята.
Применение блокировки порождает другую проблему – дедлок (deadlock) – мертвая блокировка. Пример дедлока: имеется два потока и два списка. Первый поток блокирует первый список, второй поток блокирует второй список. Первый поток изнутри первой блокировки пытается получить доступ к уже заблокированному второму списку, второй поток пытается проделать то же самое с первым списком. Получается неопределенная ситуация с бесконечным ожиданием. Эту ситуации легко описать, на практике все гораздо сложнее.
Вариантом решения проблемы дедлоков является политика определения очередности блокировок. Например, в предыдущем примере мы должны определить, что блокировка первого списка идет всегда первой, а уже потом идет блокировка второго списка.
Другая проблема с блокировками – в том, что несколько потоков могут одновременно ждать доступа к уже заблокированному ресурсу и при этом ничего не делать. Каждая питоновская программа всегда имеет главный управляющий поток.
Питоновская реализация многопоточности ограниченная. Интерпретатор питона использует внутренний глобальный блокировщик (GIL), который позволяет выполняться только одному потоку. Это сводит на нет преимущества многоядерной архитектуры процессоров. Для многопоточных приложений, которые работают в основном на дисковые операции чтения/записи, это не имеет особого значения, а для приложений, которые делят процессорное время между потоками, это является серьезным ограничением.
Средства синхронизации
Interlocked
dotPeek
Если MyEvent все еще такой как был на момент как мы начали выполнять Delegate.Combine, то запиши в него то, что вернет Delegate.Combine, а если нет, то не беда, давай попробуем еще раз и будем повторять пока не выйдет.
Monitor.Wait, Monitor.Pulse
Producer-Consumer — паттерн многопроцессного/многопоточного проектирования предполагающий наличие одного или нескольких потоков/процессов, производящих данные и один или несколько процессов/потоков эти данные обрабатывающие. Как правило использует общую коллекцию.
(Использовал именно изображение, а не текст, чтобы наглядно показать порядок выполнения инструкций)Разбор:В есть важная ремарка, касательно использования методов Pulse/Wait, а именно: Monitor не хранит информацию о состоянии, а значит, если вызов метода Pulse до вызова метода Wait может привести к дедлоку. Если такая ситуация возможна, то лучше использовать один из классов семейства ResetEvent.
ReaderWriterLockSlim
Идея: много потоков может читать, лишь один писать. Как только поток заявляет о желании писать, новые чтения не могут быть начаты, а будут ожидать завершения записи. Так же есть понятие upgradeable-read-lock, который можно использовать если в процессе чтения понимаете, о необходимости что-то записать, такой lock будет преобразован в write-lock за одну атомарную операцию.В пространстве имен System.Threading так же есть класс ReadWriteLock, но он крайне не рекомендован для новой разработки. Slim версия позволит избежать ряда случаев, приводящих к дедлокам, к тому же позволяет быстро захватить блокировку, т.к. поддерживает синхронизацию в режиме spin-wait перед уходом в режим ядра.
Создайте поток, реализуя работающий интерфейс
Если ваш класс предназначен для выполнения в виде потока, вы можете добиться этого, реализуя интерфейс Runnable. Вам нужно будет выполнить три основных шага.
Шаг 1
В качестве первого шага вам необходимо реализовать метод run(), предоставляемый интерфейсом Runnable. Этот метод обеспечивает точку входа для потока, и вы поместите всю свою бизнес-логику в этот метод. Ниже приведен простой синтаксис метода run().
public void run( )
Шаг 2
На втором этапе вы создадите экземпляр объекта Thread, используя следующий конструктор.
Thread(Runnable threadObj, String threadName);
Где — это экземпляр класса, который реализует интерфейс Runnable, а threadName — это имя, данное новому потоку.
Шаг 3
Как только объект Thread создан, вы можете запустить его, вызвав метод start(), который выполняет вызов метода run(). Ниже приведен простой синтаксис метода start().
void start();
Пример
Вот пример, который создает новый поток и запускает его.
class RunnableDemo implements Runnable { private Thread t; private String threadName; RunnableDemo( String name) { threadName = name; System.out.println("Creating " + threadName ); } public void run() { System.out.println("Running " + threadName ); try { for(int i = 4; i > 0; i--) { System.out.println("Thread: " + threadName + ", " + i); // Let the thread sleep for a while. Thread.sleep(50); } } catch (InterruptedException e) { System.out.println("Thread " + threadName + " interrupted."); } System.out.println("Thread " + threadName + " exiting."); } public void start () { System.out.println("Starting " + threadName ); if (t == null) { t = new Thread (this, threadName); t.start (); } } } public class TestThread { public static void main(String args[]) { RunnableDemo R1 = new RunnableDemo( "Thread-1"); R1.start(); RunnableDemo R2 = new RunnableDemo( "Thread-2"); R2.start(); } }
Итог
Creating Thread-1 Starting Thread-1 Creating Thread-2 Starting Thread-2 Running Thread-1 Thread: Thread-1, 4 Running Thread-2 Thread: Thread-2, 4 Thread: Thread-1, 3 Thread: Thread-2, 3 Thread: Thread-1, 2 Thread: Thread-2, 2 Thread: Thread-1, 1 Thread: Thread-2, 1 Thread Thread-1 exiting. Thread Thread-2 exiting.
Synchronizing Threads
The threading module provided with Python includes a simple-to-implement locking mechanism that allows you to synchronize threads. A new lock is created by calling the Lock() method, which returns the new lock.
The acquire(blocking) method of the new lock object is used to force threads to run synchronously. The optional blocking parameter enables you to control whether the thread waits to acquire the lock.
If blocking is set to 0, the thread returns immediately with a 0 value if the lock cannot be acquired and with a 1 if the lock was acquired. If blocking is set to 1, the thread blocks and wait for the lock to be released.
The release() method of the new lock object is used to release the lock when it is no longer required.
Example
#!/usr/bin/python import threading import time class myThread (threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): print "Starting " + self.name # Get lock to synchronize threads threadLock.acquire() print_time(self.name, self.counter, 3) # Free lock to release next thread threadLock.release() def print_time(threadName, delay, counter): while counter: time.sleep(delay) print "%s: %s" % (threadName, time.ctime(time.time())) counter -= 1 threadLock = threading.Lock() threads = [] # Create new threads thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # Start new Threads thread1.start() thread2.start() # Add threads to thread list threads.append(thread1) threads.append(thread2) # Wait for all threads to complete for t in threads: t.join() print "Exiting Main Thread"
When the above code is executed, it produces the following result −
Starting Thread-1 Starting Thread-2 Thread-1: Thu Mar 21 09:11:28 2013 Thread-1: Thu Mar 21 09:11:29 2013 Thread-1: Thu Mar 21 09:11:30 2013 Thread-2: Thu Mar 21 09:11:32 2013 Thread-2: Thu Mar 21 09:11:34 2013 Thread-2: Thu Mar 21 09:11:36 2013 Exiting Main Thread
Блокировки (Lock)
В следующем примере будут созданы три потока, каждый из которых будет считывать стартовую страницу по указанному Web-адресу. В примере имеется глобальный ресурс – список урлов – – доступ к которому будет блокироваться с помощью блокировки . Объект имеет методы:
– делает запрос на запирание замка. Если параметр не указан или является истиной, то поток будет ожидать освобождения замка.
Если параметр не был задан, метод не возвратит значения.
Если был задан и истинен, метод возвратит True (после успешного овладения замком).
Если блокировка не требуется (т.е. задан ), метод вернет , если замок не был заперт и им успешно овладел данный поток. В противном случае будет возвращено .
– запрос на отпирание замка.
– возвращает текущее состояние замка ( – заперт, – открыт).
import threading from urllib import urlopen class WorkerThread(threading.Thread): def __init__(self,url_list,url_list_lock): super(WorkerThread,self).__init__() self.url_list=url_list self.url_list_lock=url_list_lock def run(self): while (1): nexturl = self.grab_next_url() if nexturl==None:break self.retrieve_url(nexturl) def grab_next_url(self): self.url_list_lock.acquire(1) if len(self.url_list)<1: nexturl=None else: nexturl = self.url_list del self.url_list self.url_list_lock.release() return nexturl def retrieve_url(self,nexturl): text = urlopen(nexturl).read() print text print '################### %s #######################' % nexturl url_list=['http://linux.org.ru','http://kernel.org','http://python.org'] url_list_lock = threading.Lock() workerthreadlist=[] for x in range(0,3): newthread = WorkerThread(url_list,url_list_lock) workerthreadlist.append(newthread) newthread.start() for x in range(0,3): workerthreadlist.join()
Инициализация атрибутов потока
int pthread_attr_init (pthread_attr_t *attr_p)
Инициализирует структуру, указываемую attr_p, значениями «по умолчанию» (при этом распределяется кое-какая память).
Атрибуты потока:
- Область действия конкуренции (scope) — определяет связность потока с LWP.
- Отсоединенность (detachstate) — определяет то, может или нет какой-либо другой поток ожидать окончания данного (посредством функции).
- Адрес динамического стека потока (stackaddr) .
- Размер динамического стека потока(stacksize) .
- Приоритет потока (priority) .
- Правила и параметры планирования. Неприятно то, что schedpolicy по умолчанию устанавливается в SCHED_OTHER, зависимую от ОС.
Аппаратная реализация
На обычном процессоре управление потоками осуществляется операционной системой. Поток исполняется до тех пор, пока не произойдёт аппаратное прерывание, системный вызов или пока не истечёт отведённое для него операционной системой время. После этого процессор переключается на код операционной системы, который сохраняет состояние потока (его контекст) или переключается на состояние другого потока, которому тоже выделяется время на исполнение. При такой многопоточности достаточно большое количество тактов процессора тратится на код операционной системы, переключающий контексты. Если поддержку потоков реализовать аппаратно, то процессор сам сможет переключаться между потоками, а в идеальном случае — выполнять несколько потоков одновременно за каждый такт. Для операционной системы и пользователя один такой физический процессор будет виден как несколько логических процессоров.
Различают две формы многопоточности, которые могут быть реализованы в процессорах аппаратно:
- Временная многопоточность(англ. Temporal multithreading).
- Одновременная многопоточность (англ. Simultaneous multithreading).