Внутри процесса: многопоточность и пинг-понг mutex’ом

Как работают потоки в питоне

Если нужно, чтобы ваше приложение выполняло несколько задач в одно и то же время, то можете воспользоваться потоками (threads). Потоки позволяют приложениям выполнять в одно и то же время множество задач. Многопоточность (multi-threading) важна во множестве приложений, от примитивных серверов до современных сложных и ресурсоёмких игр.

Когда в одной программе работают несколько потоков, возникает проблема разграничения доступа потоков к общим данным. Предположим, что есть два потока, имеющих доступ к общему списку. Первый поток может делать итерацию по этому списку:

  for x in L

а второй в этот момент начнет удалять значения из этого списка. Тут может произойти все что угодно: программа может упасть, или мы просто получим неверные данные.

Решением в этом случае является применение блокировки. При этом доступ к заблокированному списку будет иметь только один поток, второй будет ждать, пока блокировка не будет снята.

Применение блокировки порождает другую проблему – дедлок (deadlock) – мертвая блокировка. Пример дедлока: имеется два потока и два списка. Первый поток блокирует первый список, второй поток блокирует второй список. Первый поток изнутри первой блокировки пытается получить доступ к уже заблокированному второму списку, второй поток пытается проделать то же самое с первым списком. Получается неопределенная ситуация с бесконечным ожиданием. Эту ситуации легко описать, на практике все гораздо сложнее.

Вариантом решения проблемы дедлоков является политика определения очередности блокировок. Например, в предыдущем примере мы должны определить, что блокировка первого списка идет всегда первой, а уже потом идет блокировка второго списка.

Другая проблема с блокировками – в том, что несколько потоков могут одновременно ждать доступа к уже заблокированному ресурсу и при этом ничего не делать. Каждая питоновская программа всегда имеет главный управляющий поток.

Питоновская реализация многопоточности ограниченная. Интерпретатор питона использует внутренний глобальный блокировщик (GIL), который позволяет выполняться только одному потоку. Это сводит на нет преимущества многоядерной архитектуры процессоров. Для многопоточных приложений, которые работают в основном на дисковые операции чтения/записи, это не имеет особого значения, а для приложений, которые делят процессорное время между потоками, это является серьезным ограничением.

Средства синхронизации

Interlocked

dotPeek

Если MyEvent все еще такой как был на момент как мы начали выполнять Delegate.Combine, то запиши в него то, что вернет Delegate.Combine, а если нет, то не беда, давай попробуем еще раз и будем повторять пока не выйдет.

Monitor.Wait, Monitor.Pulse

Producer-Consumer — паттерн многопроцессного/многопоточного проектирования предполагающий наличие одного или нескольких потоков/процессов, производящих данные и один или несколько процессов/потоков эти данные обрабатывающие. Как правило использует общую коллекцию.

(Использовал именно изображение, а не текст, чтобы наглядно показать порядок выполнения инструкций)Разбор:В есть важная ремарка, касательно использования методов Pulse/Wait, а именно: Monitor не хранит информацию о состоянии, а значит, если вызов метода Pulse до вызова метода Wait может привести к дедлоку. Если такая ситуация возможна, то лучше использовать один из классов семейства ResetEvent.

ReaderWriterLockSlim

Идея: много потоков может читать, лишь один писать. Как только поток заявляет о желании писать, новые чтения не могут быть начаты, а будут ожидать завершения записи. Так же есть понятие upgradeable-read-lock, который можно использовать если в процессе чтения понимаете, о необходимости что-то записать, такой lock будет преобразован в write-lock за одну атомарную операцию.В пространстве имен System.Threading так же есть класс ReadWriteLock, но он крайне не рекомендован для новой разработки. Slim версия позволит избежать ряда случаев, приводящих к дедлокам, к тому же позволяет быстро захватить блокировку, т.к. поддерживает синхронизацию в режиме spin-wait перед уходом в режим ядра.

Создайте поток, реализуя работающий интерфейс

Если ваш класс предназначен для выполнения в виде потока, вы можете добиться этого, реализуя интерфейс Runnable. Вам нужно будет выполнить три основных шага.

Шаг 1

В качестве первого шага вам необходимо реализовать метод run(), предоставляемый интерфейсом Runnable. Этот метод обеспечивает точку входа для потока, и вы поместите всю свою бизнес-логику в этот метод. Ниже приведен простой синтаксис метода run().

public void run( )

Шаг 2

На втором этапе вы создадите экземпляр объекта Thread, используя следующий конструктор.

Thread(Runnable threadObj, String threadName);

Где — это экземпляр класса, который реализует интерфейс Runnable, а threadName — это имя, данное новому потоку.

Шаг 3

Как только объект Thread создан, вы можете запустить его, вызвав метод start(), который выполняет вызов метода run(). Ниже приведен простой синтаксис метода start().

void start();

Пример

Вот пример, который создает новый поток и запускает его.

class RunnableDemo implements Runnable {
   private Thread t;
   private String threadName;
   RunnableDemo( String name) {
      threadName = name;
      System.out.println("Creating " +  threadName );
   }
   public void run() {
      System.out.println("Running " +  threadName );
      try {
         for(int i = 4; i > 0; i--) {
            System.out.println("Thread: " + threadName + ", " + i);
            // Let the thread sleep for a while.
            Thread.sleep(50);
         }
      } catch (InterruptedException e) {
         System.out.println("Thread " +  threadName + " interrupted.");
      }
      System.out.println("Thread " +  threadName + " exiting.");
   }
   public void start () {
      System.out.println("Starting " +  threadName );
      if (t == null) {
         t = new Thread (this, threadName);
         t.start ();
      }
   }
}
public class TestThread {
   public static void main(String args[]) {
      RunnableDemo R1 = new RunnableDemo( "Thread-1");
      R1.start();
      RunnableDemo R2 = new RunnableDemo( "Thread-2");
      R2.start();
   }   
}

Итог

Creating Thread-1
Starting Thread-1
Creating Thread-2
Starting Thread-2
Running Thread-1
Thread: Thread-1, 4
Running Thread-2
Thread: Thread-2, 4
Thread: Thread-1, 3
Thread: Thread-2, 3
Thread: Thread-1, 2
Thread: Thread-2, 2
Thread: Thread-1, 1
Thread: Thread-2, 1
Thread Thread-1 exiting.
Thread Thread-2 exiting.

Synchronizing Threads

The threading module provided with Python includes a simple-to-implement locking mechanism that allows you to synchronize threads. A new lock is created by calling the Lock() method, which returns the new lock.

The acquire(blocking) method of the new lock object is used to force threads to run synchronously. The optional blocking parameter enables you to control whether the thread waits to acquire the lock.

If blocking is set to 0, the thread returns immediately with a 0 value if the lock cannot be acquired and with a 1 if the lock was acquired. If blocking is set to 1, the thread blocks and wait for the lock to be released.

The release() method of the new lock object is used to release the lock when it is no longer required.

Example

#!/usr/bin/python

import threading
import time

class myThread (threading.Thread):
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter
   def run(self):
      print "Starting " + self.name
      # Get lock to synchronize threads
      threadLock.acquire()
      print_time(self.name, self.counter, 3)
      # Free lock to release next thread
      threadLock.release()

def print_time(threadName, delay, counter):
   while counter:
      time.sleep(delay)
      print "%s: %s" % (threadName, time.ctime(time.time()))
      counter -= 1

threadLock = threading.Lock()
threads = []

# Create new threads
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# Start new Threads
thread1.start()
thread2.start()

# Add threads to thread list
threads.append(thread1)
threads.append(thread2)

# Wait for all threads to complete
for t in threads:
    t.join()
print "Exiting Main Thread"

When the above code is executed, it produces the following result −

Starting Thread-1
Starting Thread-2
Thread-1: Thu Mar 21 09:11:28 2013
Thread-1: Thu Mar 21 09:11:29 2013
Thread-1: Thu Mar 21 09:11:30 2013
Thread-2: Thu Mar 21 09:11:32 2013
Thread-2: Thu Mar 21 09:11:34 2013
Thread-2: Thu Mar 21 09:11:36 2013
Exiting Main Thread

Блокировки (Lock)

В следующем примере будут созданы три потока, каждый из которых будет считывать стартовую страницу по указанному Web-адресу. В примере имеется глобальный ресурс – список урлов – – доступ к которому будет блокироваться с помощью блокировки . Объект имеет методы:

– делает запрос на запирание замка. Если параметр не указан или является истиной, то поток будет ожидать освобождения замка.

Если параметр не был задан, метод не возвратит значения.

Если был задан и истинен, метод возвратит True (после успешного овладения замком).

Если блокировка не требуется (т.е. задан ), метод вернет , если замок не был заперт и им успешно овладел данный поток. В противном случае будет возвращено .

– запрос на отпирание замка.

– возвращает текущее состояние замка ( – заперт, – открыт).

import threading
from urllib import urlopen

class WorkerThread(threading.Thread):
  def __init__(self,url_list,url_list_lock):
    super(WorkerThread,self).__init__()
    self.url_list=url_list
    self.url_list_lock=url_list_lock
    
  def run(self):
    while (1):
      nexturl = self.grab_next_url()
      if nexturl==None:break
      self.retrieve_url(nexturl)
        
  def grab_next_url(self):
    self.url_list_lock.acquire(1)
    if len(self.url_list)<1:
      nexturl=None
    else:
      nexturl = self.url_list
      del self.url_list
    self.url_list_lock.release()
    return nexturl  
        
        
  def retrieve_url(self,nexturl):
    text = urlopen(nexturl).read()
    print text
    print '################### %s #######################' % nexturl
    
url_list=['http://linux.org.ru','http://kernel.org','http://python.org']
url_list_lock = threading.Lock()
workerthreadlist=[]
for x in range(0,3):
  newthread = WorkerThread(url_list,url_list_lock)
  workerthreadlist.append(newthread)
  newthread.start()
for x in range(0,3):
  workerthreadlist.join()

Инициализация атрибутов потока

int pthread_attr_init (pthread_attr_t *attr_p)

Инициализирует структуру, указываемую attr_p, значениями «по умолчанию» (при этом распределяется кое-какая память).
Атрибуты потока:

  • Область действия конкуренции (scope) — определяет связность потока с LWP.
  • Отсоединенность (detachstate) — определяет то, может или нет какой-либо другой поток ожидать окончания данного (посредством функции).
  • Адрес динамического стека потока (stackaddr) .
  • Размер динамического стека потока(stacksize) .
  • Приоритет потока (priority) .
  • Правила и параметры планирования. Неприятно то, что schedpolicy по умолчанию устанавливается в SCHED_OTHER, зависимую от ОС.

Аппаратная реализация

На обычном процессоре управление потоками осуществляется операционной системой. Поток исполняется до тех пор, пока не произойдёт аппаратное прерывание, системный вызов или пока не истечёт отведённое для него операционной системой время. После этого процессор переключается на код операционной системы, который сохраняет состояние потока (его контекст) или переключается на состояние другого потока, которому тоже выделяется время на исполнение. При такой многопоточности достаточно большое количество тактов процессора тратится на код операционной системы, переключающий контексты. Если поддержку потоков реализовать аппаратно, то процессор сам сможет переключаться между потоками, а в идеальном случае — выполнять несколько потоков одновременно за каждый такт. Для операционной системы и пользователя один такой физический процессор будет виден как несколько логических процессоров.

Различают две формы многопоточности, которые могут быть реализованы в процессорах аппаратно:

  • Временная многопоточность(англ. Temporal multithreading).
  • Одновременная многопоточность (англ. Simultaneous multithreading).
Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Adblock
detector