Семафоры windows. Объекты синхронизации в Windows

План занятия:

· Семафоры

· Мьютекс

· Правила упрощенного параллелизма

· Рекурсивный мьютекс

· Условные переменные

Механизмами синхронизации являются средства операционной системы, которые помогают решать основную задачу синхронизации - обеспечивать координацию потоков, которые работают с совместно используемыми данными. Такие средства являются минимальными блоками для построения многопоточных программ, их называют синхронизационными механизмами.

Синхронизационные механизмы подразделяют на следующие основные категории:

  • универсальные для низкого уровня - можно использовать различными способами (семафоры);
  • простые для низкого уровня - каждый из которых приспособлен к решению только одной задачи (мьютексы и условные переменные);
  • универсальные для высокого уровня, выраженные через простые - к этой группе относится концепция монитора, которая может быть выражена через мьютексы и условные переменные;
  • простые для высокого уровня - приспособленные к решению конкретной синхронизационной задачи (блокировка чтения-записи и барьеры).

Семафоры

Концепцию семафоров предложил в 1965 году Э. Дейкстра - известный голландский специалист в области компьютерных наук. Семафоры являются старейшими синхронизационными примитивами из числа применяемых на практике.

Семафор - это совместно используемый неотъемлемый целочисленный счетчик, для которого задано начальное значение и определены следующие атомарные операции.

  • Уменьшение семафора (down): если значение семафора больше нуля, его уменьшают на единицу, если же значение равно нулю, этот поток переходит в состояние ожидания до тех пор, пока оно не станет больше нуля (говорят, что поток «ожидает семафор» или «заблокирован на семафоре»). Эту операцию называют также ожиданиям - wait;
  • Увеличение семафора (up): значение семафора увеличивается на единицу; когда при этом есть потоки, которые ожидают на семафоре, один из них выходит из ожидания и выполняет свою операцию уменьшения. Если на семафоре ожидают несколько потоков, то вследствие выполнения операции увеличения, его значение остается нулевым, но один из потоков продолжает выполнение (в большинстве реализаций выбор этого потока будет случайным). Эту операцию также называют сигнализацией - post.

Фактически значение семафора определяет количество потоков, которое может пройти через этот семафор без блокировки. Когда для семафора задано нулевое начальное значение, то он будет блокировать все потоки до тех пор, пока какой-то поток его не "откроет", выполнив операцию up. Операции up и down могут быть выполнены любыми потоками, имеющих доступ к семафора.


Мьютекс называют синхронизацией примитив, что не допускает выполнения некоторого фрагмента кода более чем одним потоком. Фактически мьютекс является реализацией блокировки на уровне ОС.

Мьютекс, как и следует из его названия, реализует взаимное исключение. Его основная задача - блокировать все потоки, которые пытаются получить доступ к коду, когда этот код уже выполняет некоторый поток.

Мьютекс может находиться в двух состояниях: свободном и занятом. Начальным состоянием является «свободный». Над мьютекс возможны две атомарные операции.

  • Занять мьютекс: если мьютекс был свободен, он становится занятым, и поток продолжает свое выполнение (входя в критическую секцию); если мьютекс был занят, поток переходит в состояние ожидания (говорят, что поток «ожидает мьютекс», или «заблокирован на мьютекс»), выполнение продолжает другой поток. Поток, который занял мьютекс, называют владельцем мьютекс;
  • Освободить мьютекс: мьютекс становится свободным; если на нем ожидают несколько потоков, из них выбирают один, он начинает выполняться, занимает мьютекс и входит в критическую секцию. В большинстве реализаций выбор потока будет случайным. Освободить мьютекс может только его владелец.

Правила упрощенного параллелизма

Правила упрощенного параллелизма предназначены для упрощения программирования на базе мьютексов. Они основываются на том очевидном факте, что мьютекс защищает не код критической секции, а совместно используемые данные внутри этой секции. Особенности работы упрощенного параллелизма:

  • Каждая переменная, которую совместно использует более одного поток, должна быть защищена отдельным мьютексом;
  • Перед каждой операцией изменения такой переменной, соответствующий мьютекс должен быть занят, а после изменения освобожден;
  • Если надо работать одновременно с несколькими совместно используемыми переменными, необходимо занять все их мьютексы до начала работы и освободить их только после полного окончания работы.

Рекурсивный мьютекс

Рекурсивный мьютекс - особый вид мьютекса. Он позволяет повторное занятие тем же потоком, а также отслеживает, какой поток пытается его занять.

Когда это не тот поток, который уже занимает, мьютекс ведет себя как обычный, и поток переходит в состояние ожидания. Когда же это тот самый поток, внутренний счетчик блокировок этого мьютекс увеличивают на единицу, и поток продолжает свое выполнение. В случае увольнения мьютекс внутренний счетчик уменьшается на единицу, для других потоков рекурсивный мьютекс будет разблокирована только тогда, когда счетчик дойдет до нуля (то есть когда все его блокировки одним потоком будут сняты).

Рекурсивные мьютексы менее эффективны в реализации, не могут быть использованы вместе с условными переменными (этот вопрос рассмотрим в следующем разделе), поэтому обращаться к ним нужно только тогда, когда без этого нельзя обойтись. Например, библиотека функций может использовать такие мьютексы для того, чтобы избежать взаимных блокировок при повторном вызове таких функций одним потоком.

Условные переменные

Условной переменной называют синхронизационные примитивы, позволяющие организовать ожидания выполнения условия внутри критической секции, заданной мьютексом. Условная переменная всегда связана с конкретным мьютексом и данными, защищенными этим мьютексом. Для условной переменной определены следующие операции:

  1. Ожидания (wait). Дополнительным входным параметром эта операция принимает мьютекс, который должен находиться в закрытом состоянии. Вызов ожидания происходит в ситуации, когда не выполняется некоторое условие и нужны потоки для продолжения работы. Вследствие выполнения ожидания поток прекращается (говорят, что он «ожидает условной переменной»), а мьютекс открывается (эти два действия происходят атомарно). Так другие потоки получают возможность войти в критическую секцию и изменить там данные, которые она защищает, возможно, выполнив условие, необходимое потоку. На этом операция ожидания не заканчивается - ее завершит другой поток, вызвав операцию сигнализации после того, как условие будет выполнено.
  2. Сигнализация (signal). Эту операцию поток должен выполнить после того, как войдет в критическую секцию и завершит работу с данными (выполнив условие, которое ожидал поток, вызвавший операцию wait). Эта операция проверяет, нет ли потоков, ожидающих условной переменной, и если такие потоки есть, переводит один из них в состояние готовности. В результате восстановления поток завершает выполнение операции ожидания и блокирует мьютекс (обновления и блокировки тоже происходят атомарно). Если нет ни одного потока, который ожидает условной переменной, операция сигнализирования не делает ничего, и информацию о ее выполнении в системе не сохраняют.
  3. Широковещательная сигнализация (broadcast) отличается от обычной тем, что перевод в состояние готовности и восстановление выполняют для всех потоков, ожидающих этой условной переменной, а не только для одного из них.

Таким образом, выполнение операции ожидания состоит из следующих этапов: открытие мьютекс, ожидания (пока другой поток не выполнит операцию signal или broadcast), закрытие мьютекс.

Иногда при работе с несколькими потоками или процессами появляется необходимость синхронизировать выполнение двух или более из них. Причина этого чаще всего заключается в том, что два или более потоков могут требовать доступ к разделяемому ресурсу, которыйреально не может быть предоставлен сразу нескольким потокам. Разделяемым называется ресурс, доступ к которому могут одновременно получать несколько выполняющихся задач.

Механизм, обеспечивающий процесс синхронизации, называется ограничением доступа. Необходимость в нем возникает также в тех случаях, когда один поток ожидает события, генерируемого другим потоком. Естественно, должен существовать какой-то способ, с помощью которого первой поток будет приостановлен до совершения события. После этого поток должен продолжить свое выполнение.

Имеется два общих состояния, в которых может находиться задача. Во-первых, задача может выполняться (или быть готовой к выполнению, как только получит доступ к ресурсам процессора). Во-вторых, задача может бытьблокирована. В этом случае ее выполнение приостановлено до тех пор, пока не освободится нужный ей ресурс или не произойдет определенное событие.

В Windows имеется специальные сервисы, которые позволяют определенным образом ограничить доступ к разделяемым ресурсам, ведь без помощи операционной системы отдельный процесс или поток не может сам определить, имеет ли он единоличный доступ к ресурсу. Операционная система Windows содержит процедуру, которая в течении одной непрерывной операции проверяет и, если это возможно, устанавливает флаг доступа к ресурсу. На языке разработчиков операционной системы такая операция называется операцией проверки и установки . Флаги, используемые для обеспечения синхронизации и управления доступом к ресурсам, называютсясемафорами (semaphore) . Интерфейс Win32 API обеспечивает поддержку семафоров и других объектов синхронизации. Библиотека MFC также включает поддержку данных объектов.

Объекты синхронизации и классы mfc

Интерфейс Win32 поддерживает четыре типа объектов синхронизации - все они так или иначе основаны на понятии семафора.

Первым типом объектов является собственно семафор, или классический (стандартный) семафор . Он позволяет ограниченному числу процессов и потоков обращаться к одному ресурсу. При этом доступ к ресурсу либо полностью ограничен (один и только один поток или процесс может обратиться к ресурсу в определенный период времени), либо одновременный доступ получает лишь малое количество потоков и процессов. Семафоры реализуются с помощью счетчика, значение которого уменьшается, когда задаче выделяется семафор, то увеличивается, когда задача освобождает семафор.

Вторым типом объектов синхронизации является исключающий (mutex) семафор . Он предназначен для полного ограничения доступа к ресурсу, чтобы в любой момент времени к ресурсу мог обратиться только один процесс или поток. Фактически, это особая разновидность семафора.

Третьим типом объектов синхронизации является событие , илиобъект события (event object). Он используется для блокирования доступа к ресурсу до тех пор, пока какой-нибудь другой процесс или поток не заявит о том, что данный ресурс может быть использован. Таким образом, данный объект сигнализирует о выполнении требуемого события.

При помощи объекта синхронизации четвертого типа можно запрещать выполнения определенных участков кода программы несколькими потоками одновременно. Для этого данные участки должны быть объявлены как критический раздел (critical section) . Когда в этот раздел входит один поток, другим потокам запрещается делать тоже самое до тех пор, пока первый поток не выйдет из данного раздела.

Критические разделы, в отличие от других типов объектов синхронизации, применяются только для синхронизации потоков внутри одного процесса. Другие же типы объектов могут быть использованы для синхронизации потоков внутри процесса или для синхронизации процессов.

В MFC механизм синхронизации, обеспечиваемый интерфейсом Win32 , поддерживается с помощью следующих классов, порожденных от класса CSyncObject:

    CCriticalSection - реализует критический раздел.

    CEvent - реализует объект события

    CMutex - реализует исключающий семафор.

    CSemaphore - реализует классический семафор.

Кроме этих классов в MFC определены также два вспомогательных класса синхронизации: CSingleLock иCMultiLock . Они контролируют доступ к объекту синхронизации и содержат методы, используемы для предоставления и освобождения таких объектов. КлассCSingleLock управляет доступом к одному объекту синхронизации, а классCMultiLock - к нескольким объектам. Далее будем рассматривать только классCSingleLock .

Когда какой-либо объект синхронизации создан, доступ к нему можно контролировать с помощью класса CSingleLock . Для этого необходимо сначала создать объект типаCSingleLock с помощью конструктора:

CSingleLock (CSyncObject* pObject, BOOL bInitialLock = FALSE);

Через первый параметр передается указатель на объект синхронизации, например семафор. Значение второго параметра определяет, должен ли конструктор попытаться получить доступ к данному объекту. Если этот параметр не равен нулю, то доступ будет получен, в противном случае попыток получить доступ не будет. Если доступ получен, то поток, создавший объект класса CSingleLock , будет остановлен до освобождения соответствующего объекта синхронизации методомUnlock классаCSingleLock .

Когда объект типа CSingleLock создан, доступ к объекту, на который указывал параметр pObject , может контролироваться с помощью двух функций: Lock иUnlock классаCSingleLock .

Метод Lock предназначен для получения доступа к объекту к объекту синхронизации. Вызвавший его поток приостанавливается до завершения данного метода, то есть до тех пор, пока не будет получен доступ к ресурсу. Значение параметра определяет, как долго функция будет ожидать получения доступа к требуемому объекту. Каждый раз при успешном завершении метода значение счетчика, связанного с объектом синхронизации, уменьшается на единицу.

Метод Unlock освобождает объект синхронизации, давая возможность другим потокам использовать ресурс. В первом варианте метода значение счетчика, связанного с данным объектом, увеличивается на единицу. Во втором варианте первый параметр определяет, на сколько это значение должно быть увеличено. Второй параметр указывает на переменную, в которую будет записано предыдущее значение счетчика.

При работе с классом CSingleLock общая процедура управления доступом к ресурсу такова:

    создать объект типа CSyncObj (например, семафор), который будет использоваться для управления доступом к ресурсу;

    с помощью созданного объекта синхронизации создать объект типа CSingleLock;

    для получения доступа к ресурсу вызвать метод Lock;

    выполнить обращение к ресурсу;

    вызвать метод Unlock , чтобы освободить ресурс.

Далее описывается, как создавать и использовать семафоры и объекты событий. Разобравшись с этими понятиями, можно достаточно просто изучить и использовать два других типа объектов снхронизации: критические секции и мьютексы.

При одновременном доступе нескольких процессов (или нескольких потоков одного процесса) к какому-либо ресурсу возникает проблема синхронизации. Поскольку поток в Win32 может быть остановлен в любой, заранее ему неизвестный момент времени, возможна ситуация, когда один из потоков не успел завершить модификацию ресурса (например, отображенной на файл области памяти), но был остановлен, а другой поток попытался обратиться к тому же ресурсу. В этот момент ресурс находится в несогласованном состоянии, и последствия обращения к нему могут быть самыми неожиданными - от порчи данных до нарушения защиты памяти.

Главной идеей, заложенной в основе синхронизации потоков в Win32, является использование объектов синхронизации и функций ожидания. Объекты могут находиться в одном из двух состояний - Signaled или Not Signaled. Функции ожидания блокируют выполнение потока до тех пор, пока заданный объект находится в состоянии Not Signaled. Таким образом, поток, которому необходим эксклюзивный доступ к ресурсу, должен выставить какой-либо объект синхронизации в несигнальное состояние, а по окончании - сбросить его в сигнальное. Остальные потоки должны перед доступом к этому ресурсу вызвать функцию ожидания, которая позволит им дождаться освобождения ресурса.

Рассмотрим, какие объекты и функции синхронизации предоставляет нам Win32 API.

Функции синхронизации

Функции синхронизации делятся на две основные категории: функции, ожидающие единственный объект, и функции, ожидающие один из нескольких объектов.

Функции, ожидающие единственный объект

Простейшей функцией ожидания является функция WaitForSingleObject:

Function WaitForSingleObject(hHandle: THandle; // идентификатор объекта dwMilliseconds: DWORD // период ожидания): DWORD; stdcall;

Функция ожидает перехода объекта hHandle в сигнальное состояние в течение dwMilliseconds миллисекунд. Если в качестве параметра dwMilliseconds передать значение INFINITE, функция будет ждать в течение неограниченного времени. Если dwMilliseconds равен 0, то функция проверяет состояние объекта и немедленно возвращает управление.

Функция возвращает одно из следующих значений:

Следующий фрагмент кода запрещает доступ к Action1 до перехода объекта ObjectHandle в сигнальное состояние (например, таким образом можно дожидаться завершения процесса, передав в качестве ObjectHandle его идентификатор, полученный функцией CreateProcess):

Var Reason: DWORD; ErrorCode: DWORD; Action1.Enabled:= FALSE; try repeat Application.ProcessMessages; Reason:= WailForSingleObject(ObjectHandle, 10); if Reason = WAIT_FAILED then begin ErrorCode:= GetLastError; raise Exception.CreateFmt(‘Wait for object failed with error: %d’, ); end; until Reason <> WAIT_TIMEOUT; finally Actionl.Enabled:= TRUE; end;

В случае когда одновременно с ожиданием объекта требуется перевести в сигнальное состояние другой объект, может использоваться функция SignalObjectAndWait:

Function SignalObjectAndWait(hObjectToSignal: THandle; // объект, который будет переведен в // сигнальное состояние hObjectToWaitOn: THandle; // объект, который ожидает функция dwMilliseconds: DWORD; // период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // завершение операции ввода-вывода): DWORD; stdcall;

Возвращаемые значения аналогичны функции WaitForSingleObject.

Объект hObjectToSignal может быть семафором, событием (event) либо мьютексом. Параметр bAlertable определяет, будет ли прерываться ожидание объекта в случае, если операционная система запросит у потока окончание операции асинхронного ввода-вывода либо асинхронный вызов процедуры. Более подробно это будет рассматриваться ниже.

Функции, ожидающие несколько объектов

Иногда требуется задержать выполнение потока до срабатывания одного или сразу всех из группы объектов. Для решения подобной задачи используются следующие функции:

Type TWOHandleArray = array of THandle; PWOHandleArray = ^TWOHandleArray; function WaitForMultipleObjects(nCount: DWORD; // Задает количество объектов lpHandles: PWOHandleArray; // Адрес массива объектов bWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds: DWORD // Период ожидания): DWORD; stdcall;

Функция возвращает одно из следующих значений:

Число в диапазоне от

WAIT_OBJECT_0 до WAIT_OBJECT_0 + nCount – 1

Если bWaitAll равно TRUE, то это число означает, что все объекты перешли в сигнальное состояние. Если FALSE - то, вычтя из возвращенного значения WAIT_OBJECT_0, мы получим индекс объекта в массиве lpHandles

Число в диапазоне от

WAIT_ABANDONED_0 до WAIT_ABANDONED_0 + nCount – 1

Если bWaitAll равно TRUE, это означает, что все объекты перешли в сигнальное состояние, но хотя бы один из владевших ими потоков завершился, не сделав объект сигнальным Если FALSE - то, вычтя из возвращенного значения WAIT_ABANDONED_0, мы получим в массиве lpHandles индекс объекта, при этом поток, владевший этим объектом, завершился, не сделав его сигнальным
WAIT_TIMEOUT Истек период ожидания
WAIT_FAILED Произошла ошибка

Например, в следующем фрагменте кода программа пытается модифицировать два различных ресурса, разделяемых между потоками:

Var Handles: array of THandle; Reason: DWORD; RestIndex: Integer; ... Handles := OpenMutex(SYNCHRONIZE, FALSE, ‘FirstResource’); Handles := OpenMutex(SYNCHRONIZE, FALSE, ‘SecondResource’); // Ждем первый из объектов Reason:= WaitForMultipleObjects(2, @Handles, FALSE, INFINITE); case Reason of WAIT_FAILED: RaiseLastWin32Error; WAIT_OBJECT_0, WAIT_ABANDONED_0: begin ModifyFirstResource; RestIndex:= 1; end; WAIT_OBJECT_0 + 1, WAIT_ABANDONED_0 + 1: begin ModifySecondResource; RestIndex:= 0; end; // WAIT_TIMEOUT возникнуть не может end; // Теперь ожидаем освобождения следующего объекта if WailForSingleObject(Handles, INFINITE) = WAIT_FAILED then RaiseLastWin32Error; // Дождались, модифицируем оставшийся ресурс if RestIndex = 0 then ModifyFirstResource else ModifySecondResource;

Описанную выше технику можно применять, если вы точно знаете, что задержка ожидания объекта будет незначительной. В противном случае ваша программа окажется «замороженной» и не сможет даже перерисовать свое окно. Если период задержки может оказаться значительным, то необходимо дать программе возможность реагировать на сообщения Windows. Выходом может стать использование функций с ограниченным периодом ожидания (и повторный вызов - в случае возврата WAIT_TIMEOUT) либо функции MsgWaitForMultipleObjects:

Function MsgWaitForMultipleObjects(nCount: DWORD; // количество объектов синхронизации var pHandles; // адрес массива объектов fWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds, // Период ожидания dwWakeMask: DWORD // Тип события, прерывающего ожидание): DWORD; stdcall;

Главное отличие этой функции от предыдущей - параметр dwWakeMask, который является комбинацией битовых флагов QS_XXX и задает типы сообщений, прерывающих ожидание функции независимо от состояния ожидаемых объектов. Например, маска QS_KEY позволяет прервать ожидание при появлении в очереди сообщений WM_KEYUP, WM_KEYDOWN, WM_SYSKEYUP или WM_SYSKEYDOWN, а маска QS_PAINT - сообщения WM_PAINT. Полный список значений, допустимых для dwWakeMask, имеется в документации по Windows SDK. При появлении в очереди потока, вызвавшего функцию, сообщений, соответствующих заданной маске, функция возвращает значение WAIT_OBJECT_0 + nCount. Получив это значение, ваша программа может обработать его и снова вызвать функцию ожидания. Рассмотрим пример с запуском внешнего приложения (необходимо, чтобы на время его работы вызывающая программа не реагировала на ввод пользователя, однако ее окно должно продолжать перерисовываться):

Procedure TForm1.Button1Click(Sender: TObject); var PI: TProcessInformation; SI: TStartupInfo; Reason: DWORD; Msg: TMsg; begin // Инициализируем структуру TStartupInfo FillChar(SI, SizeOf(SI), 0); SI.cb:= SizeOf(SI); // Запускаем внешнюю программу Win32Check(CreateProcess(NIL, "COMMAND.COM", NIL, NIL, FALSE, 0, NIL, NIL, SI, PI)); //************************************************** // Попробуйте заменить нижеприведенный код на строку // WaitForSingleObject(PI.hProcess, INFINITE); // и посмотреть, как будет реагировать программа на // перемещение других окон над ее окном //************************************************** repeat // Ожидаем завершения дочернего процесса или сообщения // перерисовки WM_PAINT Reason:= MsgWaitForMultipleObjects(1, PI.hProcess, FALSE, INFINITE, QS_PAINT); if Reason = WAIT_OBJECT_0 + 1 then begin // В очереди сообщений появился WM_PAINT – Windows // требует обновить окно программы. // Удаляем сообщение из очереди PeekMessage(Msg, 0, WM_PAINT, WM_PAINT, PM_REMOVE); // И перерисовываем наше окно Update; end; // Повторяем цикл, пока не завершится дочерний процесс until Reason = WAIT_OBJECT_0; // Удаляем из очереди накопившиеся там сообщения while PeekMessage(Msg, 0, 0, 0, PM_REMOVE) do; CloseHandle(PI.hProcess); CloseHandle(PI.hThread) end;

Если в потоке, вызывающем функции ожидания, явно (функцией CreateWindow) или неявно (используя TForm, DDE, COM) создаются окна Windows - поток должен обрабатывать сообщения. Поскольку широковещательные сообщения посылаются всем окнам в системе, то поток, не обрабатывающий сообщения, может вызвать взаимоблокировку (система ждет, когда поток обработает сообщение, поток - когда система или другие потоки освободят объект) и привести к зависанию Windows. Если в вашей программе имеются подобные фрагменты, необходимо использовать MsgWaitForMultipleObjects или MsgWaitForMultipleObjectsEx и позволять прервать ожидание для обработки сообщений. Алгоритм аналогичен вышеприведенному примеру.

Прерывание ожидания по запросу на завершение операции ввода-вывода или APC

Windows поддерживает асинхронные вызовы процедур. При создании каждого потока (thread) с ним ассоциируется очередь асинхронных вызовов процедур (APC queue). Операционная система (или приложение пользователя - при помощи функции QueueUserAPC) может помещать в нее запросы на выполнение функций в контексте данного потока. Эти функции не могут быть выполнены немедленно, поскольку поток может быть занят. Поэтому операционная система вызывает их, когда поток вызывает одну из следующих функций ожидания:

Function SleepEx(dwMilliseconds: DWORD; // Период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function WaitForSingleObjectEx(hHandle: THandle; // Идентификатор объекта dwMilliseconds: DWORD; // Период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function WaitForMultipleObjectsEx(nCount: DWORD; // количество объектов lpHandles: PWOHandleArray;// адрес массива идентификаторов объектов bWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds: DWORD; // Период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function SignalObjectAndWait(hObjectToSignal: THandle; // объект, который будет переведен в // сигнальное состояние hObjectToWaitOn: THandle; // объект, которого ожидает функция dwMilliseconds: DWORD; // период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function MsgWaitForMultipleObjectsEx(nCount: DWORD; // количество объектов синхронизации var pHandles; // адрес массива объектов fWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds, // Период ожидания dwWakeMask: DWORD // Тип события, прерывающего ожидание dwFlags: DWORD // Дополнительные флаги): DWORD; stdcall;

Если параметр bAlertable равен TRUE (либо если dwFlags в функции MsgWaitForMultipleObjectsEx содержит MWMO_ALERTABLE), то при появлении в очереди APC запроса на асинхронный вызов процедуры операционная система выполняет вызовы всех имеющихся в очереди процедур, после чего функция возвращает значение WAIT_IO_COMPLETION.

Такой механизм позволяет реализовать, например, асинхронный ввод-вывод. Поток может инициировать фоновое выполнение одной или нескольких операций ввода-вывода функциями ReadFileEx или WriteFileEx, передав им адреса функций-обработчиков завершения операции. По завершении вызовы этих функций будут поставлены в очередь асинхронного вызова процедур. В свою очередь, инициировавший операции поток, когда он будет готов обработать результаты, может, используя одну из вышеприведенных функций ожидания, позволить операционной системе вызвать функции-обработчики. Поскольку очередь APC реализована на уровне ядра ОС, она более эффективна, чем очередь сообщений, и позволяет реализовать гораздо более эффективный ввод-вывод.

Event (событие)

Event позволяет известить один или несколько ожидающих потоков о наступлении события. Event бывает:

Для создания объекта используется функция CreateEvent:

Function CreateEvent(lpEventAttributes: PSecurityAttributes; // Адрес структуры // TSecurityAttributes bManualReset, // Задает, будет Event переключаемым // вручную (TRUE) или автоматически (FALSE) bInitialState: BOOL; // Задает начальное состояние. Если TRUE - // объект в сигнальном состоянии lpName: PChar // Имя или NIL, если имя не требуется): THandle; stdcall; // Возвращает идентификатор созданного // объекта Структура TSecurityAttributes описана, как: TSecurityAttributes = record nLength: DWORD; // Размер структуры, должен // инициализироваться как // SizeOf(TSecurityAttributes) lpSecurityDescriptor: Pointer; // Адрес дескриптора защиты. В // Windows 95 и 98 игнорируется // Обычно можно указывать NIL bInheritHandle: BOOL; // Задает, могут ли дочерние // процессы наследовать объект end;

Если не требуется задание особых прав доступа под Windows NT или возможности наследования объекта дочерними процессами, в качестве параметра lpEventAttributes можно передавать NIL. В этом случае объект не может наследоваться дочерними процессами и ему задается дескриптор защиты «по умолчанию».

Параметр lpName позволяет разделять объекты между процессами. Если lpName совпадает с именем уже существующего объекта типа Event, созданного текущим или любым другим процессом, то функция не создает нового объекта, а возвращает идентификатор уже существующего. При этом игнорируются параметры bManualReset, bInitialState и lpSecurityDescriptor. Проверить, был ли объект создан или используется уже существующий, можно следующим образом:

HEvent:= CreateEvent(NIL, TRUE, FALSE, ‘EventName’); if hEvent = 0 then RaiseLastWin32Error; if GetLastError = ERROR_ALREADY_EXISTS then begin // Используем ранее созданный объект end;

Если объект используется для синхронизации внутри одного процесса, его можно объявить как глобальную переменную и создавать без имени.

Имя объекта не должно совпадать с именем любого из существующих объектов типов Semaphore, Mutex, Job, Waitable Timer или FileMapping. В случае совпадения имен функция возвращает ошибку.

Если известно, что Event уже создан, для получения доступа к нему можно вместо CreateEvent воспользоваться функцией OpenEvent:

Function OpenEvent(dwDesiredAccess: DWORD; // Задает права доступа к объекту bInheritHandle: BOOL; // Задает, может ли объект наследоваться // дочерними процессами lpName: PChar // Имя объекта): THandle; stdcall;

Функция возвращает идентификатор объекта либо 0 - в случае ошибки. Параметр dwDesiredAccess может принимать одно из следующих значений:

После получения идентификатора можно приступать к его использованию. Для этого имеются следующие функции:

Function SetEvent(hEvent: THandle): BOOL; stdcall;

Устанавливает объект в сигнальное состояние

Function ResetEvent(hEvent: THandle): BOOL; stdcall;

Сбрасывает объект, устанавливая его в несигнальное состояние

Function PulseEvent(hEvent: THandle): BOOL; stdcall

Устанавливает объект в сигнальное состояние, дает отработать всем функциям ожидания, ожидающим этот объект, а затем снова сбрасывает его.

В Windows API события используются для выполнения операций асинхронного ввода-вывода. Следующий пример показывает, как приложение инициирует запись одновременно в два файла, а затем ожидает завершения записи перед продолжением работы; такой подход может обеспечить более высокую производительность при высокой интенсивности ввода-вывода, чем последовательная запись:

Var Events: array of THandle; // Массив объектов синхронизации Overlapped: array of TOverlapped; ... // Создаем объекты синхронизации Events := CreateEvent(NIL, TRUE, FALSE, NIL); Events := CreateEvent(NIL, TRUE, FALSE, NIL); // Инициализируем структуры TOverlapped FillChar(Overlapped, SizeOf(Overlapped), 0); Overlapped.hEvent:= Events; Overlapped.hEvent:= Events; // Начинаем асинхронную запись в файлы WriteFile(hFirstFile, FirstBuffer, SizeOf(FirstBuffer), FirstFileWritten, @Overlapped); WriteFile(hSecondFile, SecondBuffer, SizeOf(SecondBuffer), SecondFileWritten, @Overlapped); // Ожидаем завершения записи в оба файла WaitForMultipleObjects(2, @Events, TRUE, INFINITE); // Уничтожаем объекты синхронизации CloseHandle(Events); CloseHandle(Events)

По завершении работы с объектом он должен быть уничтожен функцией CloseHandle.

Delphi предоставляет класс TEvent, инкапсулирующий функциональность объекта Event. Класс расположен в модуле SyncObjs.pas и объявлен следующим образом:

Type TWaitResult = (wrSignaled, wrTimeout, wrAbandoned, wrError); TEvent = class(THandleObject) public constructor Create(EventAttributes: PSecurityAttributes; ManualReset, InitialState: Boolean; const Name: string); function WaitFor(Timeout: DWORD): TWaitResult; procedure SetEvent; procedure ResetEvent; end;

Назначение методов очевидно следует из их названий. Использование этого класса позволяет не вдаваться в тонкости реализации вызываемых функций Windows API. Для простейших случаев объявлен еще один класс с упрощенным конструктором:

Type TSimpleEvent = class(TEvent) public constructor Create; end; … constructor TSimpleEvent.Create; begin FHandle:= CreateEvent(nil, True, False, nil); end;

Mutex (Mutually Exclusive)

Мьютекс - это объект синхронизации, который находится в сигнальном состоянии только тогда, когда не принадлежит ни одному из процессов. Как только хотя бы один процесс запрашивает владение мьютексом, он переходит в несигнальное состояние и остается таким до тех пор, пока не будет освобожден владельцем. Такое поведение позволяет использовать мьютексы для синхронизации совместного доступа нескольких процессов к разделяемому ресурсу. Для создания мьютекса используется функция:

Function CreateMutex(lpMutexAttributes: PSecurityAttributes; // Адрес структуры // TSecurityAttributes bInitialOwner: BOOL; // Задает, будет ли процесс владеть // мьютексом сразу после создания lpName: PChar // Имя мьютекса): THandle; stdcall;

Функция возвращает идентификатор созданного объекта либо 0. Если мьютекс с заданным именем уже был создан, возвращается его идентификатор. В этом случае функция GetLastError вернет код ошибки ERROR_ALREDY_EXISTS. Имя не должно совпадать с именем уже существующего объекта типов Semaphore, Event, Job, Waitable Timer или FileMapping.

Если неизвестно, существует ли уже мьютекс с таким именем, программа не должна запрашивать владение объектом при создании (то есть должна передать в качестве bInitialOwner значение FALSE).

Если мьютекс уже существует, приложение может получить его идентификатор функцией OpenMutex:

Function OpenMutex(dwDesiredAccess: DWORD; // Задает права доступа к объекту bInheritHandle: BOOL; // Задает, может ли объект наследоваться // дочерними процессами lpName: PChar // Имя объекта): THandle; stdcall;

Функция возвращает идентификатор открытого мьютекса либо 0 - в случае ошибки. Мьютекс переходит в сигнальное состояние после срабатывания функции ожидания, в которую был передан его идентификатор. Для возврата в несигнальное состояние служит функция ReleaseMutex:

Function ReleaseMutex(hMutex: THandle): BOOL; stdcall;

Если несколько процессов обмениваются данными, например через файл, отображенный на память, то каждый из них должен содержать следующий код для обеспечения корректного доступа к общему ресурсу:

Var Mutex: THandle; // При инициализации программы Mutex:= CreateMutex(NIL, FALSE, ‘UniqueMutexName’); if Mutex = 0 then RaiseLastWin32Error; ... // Доступ к ресурсу WaitForSingleObject(Mutex, INFINITE); try // Доступ к ресурсу, захват мьютекса гарантирует, // что остальные процессы, пытающиеся получить доступ, // будут остановлены на функции WaitForSingleObject ... finally // Работа с ресурсом окончена, освобождаем его // для остальных процессов ReleaseMutex(Mutex); end; ... // При завершении программы CloseHandle(Mutex);

Подобный код удобно инкапсулировать в класс, который создает защищенный ресурс. Мьютекс имеет свойства и методы для оперирования ресурсом, защищая их при помощи функций синхронизации.

Разумеется, если работа с ресурсом может потребовать значительного времени, то необходимо либо использовать функцию MsgWaitForSingleObject, либо вызывать WaitForSingleObject в цикле с нулевым периодом ожидания, проверяя код возврата. В противном случае ваше приложение окажется замороженным. Всегда защищайте захват-освобождение объекта синхронизации при помощи блока try ... finally, иначе ошибка во время работы с ресурсом приведет к блокированию работы всех процессов, ожидающих его освобождения.

Semaphore (семафор)

Семафор представляет собой счетчик, содержащий целое число в диапазоне от 0 до максимальной величины, заданной при его создании. Счетчик уменьшается каждый раз, когда поток успешно завершает функцию ожидания, использующую семафор, и увеличивается путем вызова функции ReleaseSemaphore. При достижении семафором значения 0 он переходит в несигнальное состояние, при любых других значениях счетчика его состояние - сигнальное. Такое поведение позволяет использовать семафор в качестве ограничителя доступа к ресурсу, поддерживающему заранее заданное количество подключений.

Для создания семафора служит функция CreateSemaphore:

Function CreateSemaphore(lpSemaphoreAttributes: PSecurityAttributes; // Адрес структуры // TSecurityAttributes lInitialCount, // Начальное значение счетчика lMaximumCount: Longint; // Максимальное значение счетчика lpName: PChar // Имя объекта): THandle; stdcall;

Функция возвращает идентификатор созданного семафора либо 0, если создать объект не удалось.

Параметр lMaximumCount задает максимальное значение счетчика семафора, lInitialCount задает начальное значение счетчика и должен быть в диапазоне от 0 до lMaximumCount. lpName задает имя семафора. Если в системе уже есть семафор с таким именем, то новый не создается, а возвращается идентификатор существующего семафора. В случае если семафор используется внутри одного процесса, можно создать его без имени, передав в качестве lpName значение NIL. Имя семафора не должно совпадать с именем уже существующего объекта типов event, mutex, waitable timer, job или file-mapping.

Идентификатор ранее созданного семафора может быть также получен функцией OpenSemaphore:

Function OpenSemaphore(dwDesiredAccess: DWORD; // Задает права доступа к объекту bInheritHandle: BOOL; // Задает, может ли объект наследоваться // дочерними процессами lpName: PChar // Имя объекта): THandle; stdcall;

Параметр dwDesiredAccess может принимать одно из следующих значений:

Для увеличения счетчика семафора используется функция ReleaseSemaphore:

Function ReleaseSemaphore(hSemaphore: THandle; // Идентификатор семафора lReleaseCount: Longint; // Счетчик будет увеличен на эту величину lpPreviousCount: Pointer // Адрес 32-битной переменной, // принимающей предыдущее значение // счетчика): BOOL; stdcall;

Если значение счетчика после выполнения функции превысит заданный для него функцией CreateSemaphore максимум, то ReleaseSemaphore возвращает FALSE и значение семафора не изменяется. В качестве параметра lpPreviousCount можно передать NIL, если это значение нам не нужно.

Рассмотрим пример приложения, запускающего на выполнение несколько заданий в отдельных потоках (например, программа для фоновой загрузки файлов из Internet). Если количество одновременно выполняющихся заданий будет слишком велико, то это приведет к неоправданной загрузке канала. Поэтому реализуем потоки, в которых будет выполняться задание, таким образом, чтобы когда их количество превышает заранее заданную величину, то поток бы останавливался и ожидал завершения работы ранее запущенных заданий:

Unit LimitedThread; interface uses Classes; type TLimitedThread = class(TThread) procedure Execute; override; end; implementation uses Windows; const MAX_THREAD_COUNT = 10; var Semaphore: THandle; procedure TLimitedThread.Execute; begin // Уменьшаем счетчик семафора. Если к этому моменту уже запущено // MAX_THREAD_COUNT потоков - счетчик равен 0 и семафор в // несигнальном состоянии. Поток будет заморожен до завершения // одного из запущенных ранее. WaitForSingleObject(Semaphore, INFINITE); // Здесь располагается код, отвечающий за функциональность потока, // например загрузка файла... // Поток завершил работу, увеличиваем счетчик семафора и позволяем // начать обработку другим потокам. ReleaseSemaphore(Semaphore, 1, NIL); end; initialization // Создаем семафор при старте программы Semaphore:= CreateSemaphore(NIL, MAX_THREAD_COUNT, MAX_THREAD_COUNT, NIL); finalization // Уничтожаем семафор по завершении программы CloseHandle(Semaphore); end;

В предыдущих частях статьи я рассказал об общих принципах и конкретных методах построения многопоточных приложений. Различным потокам практически всегда периодически требуется взаимодействие между собой, при этом неизбежно возникает необходимость в синхронизации. Сегодня мы рассмотрим важнейший, самый мощный и универсальный инструмент синхронизации Windows: объекты синхронизации ядра.

WaitForMultipleObjects и другие функции ожидания

Как вы помните, для синхронизации потоков, обычно требуется временно приостановить выполнение одного из потоков. При этом он должен быть переведён средствами операционной системы в состояние ожидания, в котором он не занимает процессорное время. Мы уже знаем две функции, которые могут это делать: SuspendThread и ResumeThread . Но как я рассказывал в предыдущей части статьи, в силу некоторых особенностей эти функции непригодны для синхронизации.

Сегодня мы рассмотрим другую функцию, которая также переводит в поток в состояние ожидания, но в отличие от SuspendThread/ResumeThread , специально предназначена именно для организации синхронизации. Это WaitForMultipleObjects . Поскольку эта функция очень важна, я несколько отступлю от своего правила не вдаваться в детали API и расскажу о ней подробнее, даже приведу ее прототип:

DWORD WaitForMultipleObjects(

DWORD nCount, // число объектов в массиве lpHandles

CONST HANDLE * lpHandles, // указатель на массив описателей объектов ядра

BOOL bWaitAll, // флаг, означающей надо ли дожидаться всех объектов или достаточно одного

DWORD dwMilliseconds // таймаут

Главный параметр этой функции - это указатель на массив хэндлов объектов ядра. О том, что это за объекты, мы поговорим ниже. Пока нам важно знать то, что любой из таких объектов может находиться в одном из двух состояний: нейтральном или «сигнализирующем» (signaled state). Если флаг bWaitAll равен FALSE, функция вернет управление, как только хотя бы один из объектов подаст сигнал. А если флаг равен TRUE, это произойдет только тогда, когда сразу все объекты начнут сигнализировать (как мы увидим, это важнейшее свойство этой функции). В первом случае по возвращаемому значению можно узнать, какой именно из объектов подал сигнал. Надо вычесть из него константу WAIT_OBJECT_0 , и получится индекс в массиве lpHandles. Если время ожидания превысило указанный в последнем параметре таймаут, функция прекратит ожидание и вернет значение WAIT_TIMEOUT . В качестве таймаута можно указать константу INFINITE , и тогда функция будет ждать «до упора», а можно наоборот 0, и тогда поток вообще не будет приостановлен. В последнем случае функция вернет управление немедленно, но по ее результату можно будет узнать состояние объектов. Последний прием используется очень часто. Как видите, эта функция обладает богатыми возможностями. Имеется еще несколько WaitForXXX функций, но все они представляют собой вариации на тему главной. В частности, WaitForSingleObject представляет собой всего лишь ее упрощенный вариант. Остальные имеют каждая свою дополнительную функциональность, но применяются, в общем-то, реже. Например, они дают возможность реагировать не только на сигналы объектов ядра, но и на поступление новых оконных сообщений в очередь потока. Их описание, так же как и детальные сведения о WaitForMultipleObjects , вы, как обычно, найдете в MSDN.

Теперь о том, что же это за таинственные «объекты ядра». Начнем с того, что в их число входят сами потоки и процессы. Они переходят в сигнализирующее состояние сразу по завершении. Это очень важная возможность, поскольку очень часто бывает необходимо отслеживать момент завершения потока или процесса. Пусть, например, наше серверное приложение с набором рабочих потоков должно завершиться. Управляющий поток при этом должен каким-либо способом проинформировать рабочие потоки о том, что пора заканчивать работу (например, установив глобальный флаг), после чего дождаться, пока все потоки не завершатся, сделав все необходимые для корректного завершения действия: освободив ресурсы, информировав клиентов о завершении работы, закрыв сетевые соединения и т.п.

То, что потоки включают сигнал по окончанию работы, позволяет решить задачу синхронизации с завершением потока предельно легко:

// Пусть для простоты у нас будет всего один рабочий поток. Запускаем его:

HANDLE hWorkerThread = :: CreateThread(...);

// Перед окончанием работы надо каким-либо образом сообщаем рабочему потоку, что пора закачивать.

// Ждем завершения потока:

DWORD dwWaitResult = :: WaitForSingleObject( hWorkerThread, INFINITE );

if ( dwWaitResult != WAIT_OBJECT_0 ) { /* обработка ошибки */ }

// "Хэндл" потока можно закрыть:

VERIFY(:: CloseHandle( hWorkerThread );

/* Если CloseHandle завершилась неудачей и вернула FALSE, я не выбрасываю исключение. Во-первых, даже если бы это произошло из-за системной ошибки, это не имело бы прямых последствий для нашей программы, ведь раз мы закрываем хендл, значит никакой работы с ним в дальнейшем не предполагается. Реально же неудача CloseHandle может означать только ошибку в вашей программе. Поэтому вставим здесь макрос VERIFY, чтобы не пропустить её на этапе отладки приложения. */

Аналогично будет выглядеть код, ожидающий завершения процесса.

Если бы такой встроенной в систему возможности не было, рабочий поток должен был бы сам как-то передать информацию о своем завершении главному. Даже если бы он делал бы это в самую последнюю очередь, главный поток не мог бы быть уверенным, что рабочему не осталось выполнить хотя бы пару ассемблерных инструкций. В отдельных ситуациях (например, если код потока находится в DLL, которая должна быть выгружена по его завершении) это может привести к фатальным последствиям.

Хочу напомнить вам, что даже после того, как поток (или процесс) завершился, его описатели все равно остаются в силе до тех пор, пока явно не будут закрыты функцией CloseHandle . (Кстати, не забывайте делать это!) Это сделано как раз для того, чтобы в любой момент можно было бы проверить состояние потока.

Итак, функция WaitForMultipleObjects (и ее аналоги) позволяет синхронизировать выполнение потока с состоянием объектов синхронизации, в частности других потоков и процессов.

Специальные объекты ядра

Перейдем к рассмотрению объектов ядра, которые предназначены специально для синхронизации. Это события, семафоры и мьютексы. Кратко рассмотрим каждый из них:

Событие (event)

Пожалуй, самый простой и фундаментальный синхронизирующий объект. Это всего лишь флаг, которому функциями SetEvent/ResetEvent можно задать состояние: сигнализирующее или нейтральное. Событие - это самый удобный способ передать сигнал ожидающему потоку, что некое событие свершилось (потому оно так и называется), и можно продолжать работу. С помощью события мы легко решим проблему синхронизации при инициализации рабочего потока:

// Пусть для простоты хэндл события будет храниться в глобальной переменной:

HANDLE g_hEventInitComplete = NULL; // никогда не оставляем переменную неинициализированной!

{ // код в главном потоке

// создаем событие

g_hEventInitComplete = :: CreateEvent( NULL,

FALSE, // об этом параметре мы еще поговорим

FALSE, // начальное состояние - нейтральное

if (! g_hEventInitComplete ) { /* не забываем про обработку ошибок */ }

// создаем рабочий поток

DWORD idWorkerThread = 0 ;

HANDLE hWorkerThread = :: CreateThread( NULL, 0 , & WorkerThreadProc, NULL, 0 , & idWorkerThread );

if (! hWorkerThread ) { /* обработка ошибки */ }

// ждем сигнала от рабочего потока

DWORD dwWaitResult = :: WaitForSingleObject( g_hEventInitComplete, INFINITE );

if ( dwWaitResult != WAIT_OBJECT_0 ) { /* ошибка */ }

// вот теперь можно быть уверенным, что рабочий поток завершил инициализацию.

VERIFY(:: CloseHandle( g_hEventInitComplete )); // не забываем закрывать ненужные объекты

g_hEventInitComplete = NULL;

// функция рабочего потока

DWORD WINAPI WorkerThreadProc( LPVOID _parameter )

InitializeWorker(); // инициализация

// сигнализируем, что инициализация завершена

BOOL isOk = :: SetEvent( g_hEventInitComplete );

if (! isOk ) { /* ошибка */ }

Надо заметить, что существуют две заметно отличающиеся разновидности событий. Мы можем выбрать одну из них с помощью второго параметра функции CreateEvent . Если он TRUE, создается событие, состояние которого управляется только вручную, то есть функциями SetEvent/ResetEvent . Если же он FALSE, будет создано событие с автосбросом. Это означает, что как только некий поток, ожидающий данного события, будет освобожден сигналом от этого события, оно автоматически будет сброшено обратно в нейтральное состояние. Наиболее ярко их отличие проявляется в ситуации, когда одного события ожидают сразу несколько потоков. Событие с ручным управлением подобно стартовому пистолету. Как только оно будет установлено в сигнализирующее состояние, будут освобождены сразу все потоки. Событие же с автосбросом похоже на турникет в метро: оно отпустит лишь один поток и вернется в нейтральное состояние.

Мьютекс (mutex)

По сравнению с событием это более специализированный объект. Обычно он используется для решения такой распространенной задачи синхронизации, как доступ к общему для нескольких потоков ресурсу. Во многом он похож на событие с автосбросом. Основное отличие состоит в том, что он имеет специальную привязку к конкретному потоку. Если мьютекс находится в сигнальном состоянии - это означает, что он свободен и не принадлежит ни одному потоку. Как только некий поток дождался этого мьютекса, последний сбрасывается в нейтральное состояние (здесь он как раз похож на событие с автосбросом), а поток становится его хозяином до тех пор, пока явно не освободит мьютекс функцией ReleaseMutex , или же не завершится. Таким образом, чтобы быть уверенным, что с разделяемыми данными одновременно работает только один поток, следует все места, где происходит такая работа, окружить парой: WaitFor - ReleaseMutex :

HANDLE g_hMutex;

// Пусть хэндл мьютекса хранится в глобальной переменной. Его конечно надо создать заранее, до запуска рабочих потоков. Будем считать, что это уже было сделано.

int iWait = :: WaitForSingleObject( g_hMutex, INFINITE );

switch ( iWait ) {

case WAIT_OBJECT_0: // Все нормально

break ;

case WAIT_ABANDONED: /* Какой-то поток завершился, забыв вызвать ReleaseMutex. Скорее всего, это означает ошибку в вашей программе! Поэтому на всякий случай вставим здесь ASSERT, но в окончательно версии (release) будем считать этот код успешным. */

ASSERT( false );

break ;

default :

// Здесь должна быть обработка ошибки.

// Защищенный мьютексом участок кода.

ProcessCommonData();

VERIFY(:: ReleaseMutex( g_hMutex ));

Чем же мьютекс лучше события с автосбросом? В приведенном примере его также можно было бы использовать, только ReleaseMutex надо было бы заменить на SetEvent . Однако может возникнуть следующая сложность. Чаще всего работать с общими данными приходится в нескольких местах. Что будет, если ProcessCommonData в нашем примере вызовет функцию, которая работает с этими же данными и в которой уже есть своя пара WaitFor - ReleaseMutex (на практике это встречается весьма часто)? Если бы мы использовали событие, программа, очевидно, зависла бы, поскольку внутри защищенного блока событие находится в нейтральном состоянии. Мьютекс же устроен более хитро. Для потока-хозяина он всегда остается в сигнализирующем состоянии, несмотря на то, что для всех остальных потоков он при этом находится в нейтральном. Поэтому если поток захватил мьютекс, повторный вызов WaitFor функции не приведет к блокировке. Более того, в мьютекс встроен еще и счетчик, так что ReleaseMutex должна быть вызвана столько же раз, сколько было вызовов WaitFor . Таким образом, мы можем смело защищать каждый участок кода, работающий с общими данными, парой WaitFor - ReleaseMute x, не волнуясь о том, что этот код может быть вызван рекурсивно. Это делает мьютекс очень простым в использовании инструментом.

Семафор (semaphore)

Еще более специфический объект синхронизации. Должен сознаться, что в моей практике еще не было случая, где он пригодился бы. Семафор предназначен для того, чтобы ограничить максимальное число потоков, одновременно работающих с неким ресурсом. По сути, семафор - это событие со счетчиком. Пока этот счетчик больше нуля, семафор находится в сигнализирующем состоянии. Однако каждый вызов WaitFor уменьшает этот счетчик на единицу до тех пор, пока он не станет равным нулю и семафор перейдет в нейтральное состояние. Подобно мьютексу, для семафора есть функция ReleaseSemaphor , увеличивающая счётчик. Однако в отличие от мьютекса семафор не привязан к потоку и повторный вызов WaitFor/ReleaseSemaphor еще раз уменьшит/увеличит счетчик.

Как можно использовать семафор? Например, с его помощью можно искусственно ограничить многопоточность. Как я уже рассказывал, слишком много одновременно активных потоков может заметно ухудшить производительность всей системы из-за частых переключений контекстов. И если уж нам пришлось создать слишком много рабочих потоков, можно ограничить число одновременно активных потоков числом порядка числа процессоров.

Что ещё можно сказать про объекты синхронизации ядра? Очень удобна возможность давать им имена. Соответствующий параметр есть у всех функций, создающих объекты синхронизации: CreateEvent , CreateMutex , CreateSemaphore . Если вы дважды вызовете, к примеру CreateEvent , оба раза указав одно и тоже непустое имя, то второй раз функция вместо того, чтобы создать новый объект, вернет хэндл уже существующего. Это произойдет, даже если второй вызов был сделан из другого процесса. Последнее очень удобно в тех случаях, когда требуется синхронизировать потоки, принадлежащие разным процессам.

Когда объект синхронизации вам больше не требуется, не забывайте вызвать функцию CloseHandle , о которой я уже упоминал, когда рассказывал о потоках. На самом деле, она не обязательно сразу же удалит объект. Дело в том, что хэндлов у объекта может быть создано несколько, тогда он будет удален только когда будет закрыт последний из них.

Хочу напомнить, что лучший способ гарантировать, чтобы CloseHandle или подобная «очищающая» функция была обязательно вызвана, даже в случае нештатной ситуации, - это поместить ее в деструктор. Об этом, кстати, в свое время неплохо и очень подробно рассказывалось в статье Кирилла Плешивцева «Умный деструктор». В приведённых выше примерах я не использовал этот приём исключительно в учебных целях, чтобы работа функций API была более наглядной. В реальном же коде для очистки следует всегда использовать классы-оболочки с умными деструкторами.

Кстати, с функцией ReleaseMutex и подобными постоянно возникает та же проблема, что и с CloseHandle . Она обязана быть вызвана по окончании работы с общими данными, независимо от того, насколько успешно эта работа была завершена (ведь могло быть выброшено исключение). Последствия «забывчивости» здесь более серьёзны. Если не вызванный CloseHandle приведёт лишь у утечке ресурсов (что тоже плохо!), то не освобожденный мьютекс не позволит другим потокам работать с общим ресурсом до самого завершения давшего сбой потока, что скорее всего не позволит приложению нормально функционировать. Чтобы избежать этого, нам опять поможет специально обученный класс с умным деструктором.

Заканчивая обзор объектов синхронизации, хочется упомянуть об объекте, которого нет в Win32 API. Многие мои коллеги высказывают недоумение, почему в Win32 нет специализированного объекта типа «один пишет, многие читают». Этакий «продвинутый мьютекс», следящий за тем, чтобы получить доступ к общим данным на запись мог бы одновременно только один поток, а только на чтение - сразу несколько. Подобный объект можно найти в UNIX"ах. Некоторые библиотеки, например от Borland, предлагают эмулировать его на основе стандартных объектов синхронизации. Впрочем, реальная польза от таких эмуляций весьма сомнительна. Эффективно такой объект может быть реализован только на уровне ядра операционной системы. Но в ядре Windows подобный объект не предусмотрен.

Почему же разработчики ядра Windows NT не позаботились об этом? Чем мы хуже UNIX? На мой взгляд, ответ заключается в том, что реальной потребности в таком объекте для Windows пока просто не возникало. На обычной однопроцессорной машине, где потоки все равно физически не могут работать одновременно, он будет практически эквивалентен мьютексу. На многопроцессорной машине он может дать выигрыш за счёт того, что позволит читающим потокам работать параллельно. Вместе с тем, реально этот выигрыш станет ощутим лишь когда вероятность «столкновения» читающих потоков велика. Несомненно, что к примеру на 1024-процессорной машине подобный объект ядра будет жизненно необходим. Подобные машины существуют, но это - специализированные системы, работающие под специализированными ОС. Зачастую, такие ОС строят на основе UNIX, вероятно оттуда объект типа «один пишет, многие читают» попал и в более общеупотребительные версии этой системы. Но на привычных нам x86-машинах установлен, как правило, всего один и лишь изредка два процессора. И только самые продвинутые модели процессоров типа Intel Xeon поддерживают 4-х и даже более процессорные конфигурации, но такие системы пока остаются экзотикой. Но даже на такой «продвинутой» системе «продвинутый мьютекс» сможет дать заметный выигрыш в производительности лишь в очень специфических ситуациях.

Таким образом, реализация «продвинутого» мьютекса просто не стоит свеч. На «малопроцессорной» машине он может оказаться даже менее эффективным из-за усложнения логики объекта по сравнению со стандартным мьютексом. Учтите, реализация подобного объекта не так проста, как может показаться на первый взгляд. При неудачной реализации, если читающих потоков будет слишком много, пишущему потоку будет просто «не пробиться» к данным. По этим причинам я также не рекомендую вам пытаться эмулировать такой объект. В реальных приложениях на реальных машинах обычный мьютекс или критическая секция (о которой речь пойдет в следующей части статьи) прекрасно справится с задачей синхронизации доступа к общим данным. Хотя, я полагаю, с развитием ОС Windows объект ядра «один пишет многие читают» рано или поздно появится.

Примечание. На самом деле, объект «один пишет - многие читают» в Windows NT всё-таки есть. Просто, когда я писал эту статью, ещё не знал об этом. Этот объект носит название «ресурсы ядра» и не доступен для програм пользовательского режима, вероятно поэтому и не слишком известен. Подобности о нём можно найти в DDK. Спасибо Константину Манурину, что указал мне на это.

Deadlock

А теперь вернемся к функции WaitForMultipleObjects , точнее к ее третьему параметру, bWaitAll. Я обещал рассказать, почему возможность ожидания сразу нескольких объектов так важна.

Почему необходима функция, позволяющая ожидать один из нескольких объектов, понятно. В отсутствие специальной функции это можно было бы сделать, разве что последовательно проверяя состояние объектов в пустом цикле, что, конечно же, недопустимо. А вот необходимость в специальной функции, позволяющей ожидать момента, когда сразу несколько объектов перейдут в сигнальное состояние, не так очевидна. Действительно, представим следующую характерную ситуацию: нашему потоку в определенный момент необходим доступ сразу к двум наборам общих данных, за каждый из которых отвечает свой мьютекс, назовем их А и В. Казалось бы, поток может сначала подождать, пока освободиться мьютекс А, захватить его, затем подождать освобождения мьютекса В... Вроде бы, можно обойтись парой вызовов WaitForSingleObject . Действительно, это будет работать, но только до тех пор, пока все остальные потоки будут захватывать мьютексы в том же порядке: сначала А, потом В. Что произойдет, если некий поток попытается сделать наоборот: захватить сначала В, затем А? Рано или поздно возникнет ситуация, когда один поток захватил мьютекс А, другой В, первый ждет, когда освободится В, второй А. Понятно, что они этого никогда не дождутся и программа зависнет.

Подобная взаимная блокировка (deadlock) является очень характерной ошибкой. Как и все ошибки, связанные с синхронизацией, она проявляется лишь время от времени и способна испортить программисту немало нервов. В то же время взаимоблокировкой чревата практически любая схема, в которую вовлечены несколько объектов синхронизации. Поэтому, этой проблеме следует уделять особое внимание на этапе проектирования такой схемы.

В приведенном простом примере избежать блокировки довольно легко. Надо потребовать, чтобы все потоки захватывали мьютексы в определенном порядке: сначала А, потом В. Однако в сложной программе, где много различным образом связанных друг с другом объектов, добиться этого обычно бывает не так просто. В блокировку могут быть вовлечены не два, а много объектов и потоков. Поэтому, самый надёжный способ избежать взаимной блокировки в ситуации, когда потоку требуется сразу несколько объектов синхронизации - это захватывать их все одним вызовом функции WaitForMultipleObjects с параметром bWaitAll=TRUE. По правде говоря, при этом мы всего лишь перекладываем проблему взаимных блокировок на ядро операционной системы, но главное - это уже будет не наша забота. Впрочем, в сложной программе со множеством объектов, когда не всегда сразу можно сказать, какие именно из них потребуются для выполнения той или иной операции, свести все вызовы WaitFor в одно место и объединить тоже часто оказывается не просто.

Таким образом, есть два пути избежать возникновения deadlock. Надо либо следить, чтобы объекты синхронизации всегда захватывались потоками строго в одном и том же порядке, либо чтобы они захватывались единым вызовом WaitForMultipleObjects . Последний метод более прост и предпочтителен. Однако на практике, с выполнением и того и другого требования постоянно возникают сложности, приходится сочетать оба этих подхода. Проектирование сложных схем синхронизации часто является весьма нетривиальной задачей.

Пример организации синхронизации

В большинстве типичных ситуаций, вроде тех, что я описывал выше, организовать синхронизацию не составляет труда, достаточно события или мьютекса. Но периодически встречаются более сложные случаи, где решение проблемы не так очевидно. Хочу проиллюстрировать это на конкретном примере из своей практики. Как вы увидите, решение оказалось удивительно простым, но прежде чем я нашел его, мне пришлось перепробовать несколько неудачных вариантов.

Итак, задача. Практически во всех современных менеджерах закачек (download managers), или попросту говоря «качалках» есть возможность ограничения трафика, чтобы работающая в фоновом режиме «качалка» не сильно мешала пользователю лазить по Сети. Я разрабатывал похожую программу, и передо мной была поставлена задача реализовать именно такую «фичу». Моя качалка работала по классической схеме многопоточности, когда каждой задачей, в данном случае скачиванием конкретного файла, занимается отдельный поток. Ограничение трафика должно было быть суммарным для всех потоков. То есть, нужно было добиться, чтобы в течение заданного интервала времени все потоки считывали из своих сокетов не более определенного количества байтов. Просто разделить этот лимит поровну между потоками, очевидно, будет неэффективно, поскольку скачивание файлов может идти весьма неравномерно, один будет качаться быстро, другой медленно. Следовательно, нужен общий для всех потоков счетчик, сколько байт считано, и сколько еще можно считать. Здесь-то и не обойтись без синхронизации. Дополнительную сложность задаче придало требование того, чтобы в любой момент любой из рабочих потоков можно было остановить.

Сформулируем задачу более детально. Систему синхронизации я решил заключить в специальный класс. Вот его интерфейс:

class CQuota {

public : // methods

void Set( unsigned int _nQuota );

unsigned int Request( unsigned int _nBytesToRead, HANDLE _hStopEvent );

void Release( unsigned int _nBytesRevert, HANDLE _hStopEvent );

Периодически, скажем раз в секунду, управляющий поток вызывает метод Set, устанавливая квоту на скачивание. Перед тем, как рабочий поток считает данные, полученные из сети, он вызывает метод Request, который проверяет, что текущая квота не равна нулю, и если да, возвращает не превышающее текущую квоту число байтов, которые можно считать. Квота соответственно уменьшается на это число. Если же при вызове Request квота равна нулю, вызывающий поток должен подождать, пока она не появится. Иногда случается, что реально получено меньше байтов, чем было запрошено, в таком случае поток возвращает часть выделенной ему квоты методом Release. И, как я уже сказал, пользователь в любой момент может дать команду прекратить скачивание. В таком случае ожидание надо прервать независимо от наличия квоты. Для этого используется специальное событие: _hStopEvent. Поскольку задачи можно запускать и останавливать независимо друг от друга, для каждого рабочего потока используется свое событие остановки. Его описатель передается методам Request и Release.

В одном из неудачных вариантов я попробовал использовать сочетание мьютекса, синхронизирующего доступ к классу CQuota и события, сигнализирующего наличие квоты. Однако в эту схему никак не вписывается событие остановки. Если поток желает получить квоту, то его состояние ожидания должно управляться сложным логическим выражением: ((мьютекс И событие наличия квоты) ИЛИ событие остановки). Но WaitForMultipleObjects такого не позволяет, можно объединить несколько объектов ядра либо операцией И, либо ИЛИ, но не вперемешку. Попытка разделить ожидание двумя последовательными вызовами WaitForMultipleObjects неизбежно приводит к deadlock. В общем, этот путь оказался тупиковым.

Не буду больше напускать тумана и расскажу решение. Как я уже говорил, мьютекс очень похож на событие с автосбросом. И здесь мы имеем как раз тот редкий случай, когда удобнее использовать именно его, но не одно а сразу два:

class CQuota {

private: // data

unsigned int m_nQuota;

CEvent m_eventHasQuota;

CEvent m_eventNoQuota;

Одновременно может быть установлено только одно из этих событий. Любой поток, произведший манипуляции с квотой должен установить первое событие, если оставшаяся квота не равна нулю, и второе, если квота исчерпана. Поток, желающий получить квоту, должен подождать первого события. Потоку же, увеличивающего квоту, достаточно дождаться любого из этих событий, поскольку если оба они находятся в сброшенном состоянии, это означает, что с квотой в данный момент работает другой поток. Таким образом, два события выполняют сразу две функции: синхронизации доступа к данным и ожидание. Наконец, поскольку поток ожидает одного из двух событий, сюда легко включается и событие, дающее сигнал на остановку.

Приведу для примера реализацию метода Request. Остальные реализуются аналогично. Я слегка упростил код, использовавшийся в реальном проекте:

unsigned int CQuota:: Request( unsigned int _nRequest, HANDLE _hStopEvent )

if (! _nRequest ) return 0 ;

unsigned int nProvide = 0 ;

HANDLE hEvents[ 2 ];

hEvents[ 0 ] = _hStopEvent; // Событие остановки имеет больший приоритет. Ставим его первым.

hEvents[ 1 ] = m_eventHasQuota;

int iWaitResult = :: WaitForMultipleObjects( 2 , hEvents, FALSE, INFINITE );

switch ( iWaitResult ) {

case WAIT_FAILED:

// ОШИБКА

throw new CWin32Exception;

case WAIT_OBJECT_0:

// Событие остановки. Я обрабатывал его с помощью специального исключения, но ничто не мешает реализовать это как-то иначе.

throw new CStopException;

case WAIT_OBJECT_0+ 1 :

// Событие "квота доступна"

ASSERT( m_nQuota ); // Если сигнал подало это событие, но квоты на самом деле нет, значит где-то мы ошиблись. Надо искать баг!

if ( _nRequest >= m_nQuota ) {

nProvide = m_nQuota;

m_nQuota = 0 ;

m_eventNoQuota. Set();

else {

nProvide = _nRequest;

m_nQuota -= _nRequest;

m_eventHasQuota. Set();

break ;

return nProvide;

Маленькое замечание. Библиотека MFC в том проекте не использовалась, но, как вы наверно уже догадались, я сделал собственный класс CEvent, оболочку вокруг объекта ядра «событие», подобную MFC"шной. Как я уже говорил, такие простые классы-оболочки очень полезны, когда есть некий ресурс (в данном случае объект ядра), который необходимо не забыть освободить по окончанию работы. В остальном же нет разницы, писать ли SetEvent(m_hEvent) или m_event.Set().

Надеюсь, этот пример поможет вам спроектировать собственную схему синхронизации, если вам встретится нетривиальная ситуация. Главное - максимально тщательно проанализируйте вашу схему. Не может ли возникнуть ситуации, в которой она сработает неправильно, в частности, не может ли возникнуть блокировка? Отлавливать такие ошибки в отладчике - обычно безнадежное дело, здесь помогает только детальный анализ.

Итак, мы рассмотрели важнейшее средство синхронизации потоков: объекты синхронизации ядра. Это мощный и универсальный инструмент. С его помощью можно строить даже очень сложные схемы синхронизации. К счастью, такие нетривиальные ситуации встречаются нечасто. Кроме того, за универсальность всегда приходится расплачиваться производительностью. Поэтому во многих случаях стоит использовать другие средства синхронизации потоков, имеющиеся в Windows, такие как критические секции и атомарные операции. Они не так универсальны, зато просты и эффективны. О них мы и поговорим в следующей части.

Последнее обновление: 31.10.2015

Нередко в потоках используются некоторые разделяемые ресурсы, общие для всей программы. Это могут быть общие переменные, файлы, другие ресурсы. Например:

Class Program { static int x=0; static void Main(string args) { for (int i = 0; i < 5; i++) { Thread myThread = new Thread(Count); myThread.Name = "Поток " + i.ToString(); myThread.Start(); } Console.ReadLine(); } public static void Count() { x = 1; for (int i = 1; i < 9; i++) { Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x); x++; Thread.Sleep(100); } } }

Здесь у нас запускаются пять потоков, которые работают с общей переменной x. И мы предполагаем, что метод выведет все значения x от 1 до 8. И так для каждого потока. Однако в реальности в процессе работы будет происходить переключение между потоками, и значение переменной x становится непредсказуемым.

Решение проблемы состоит в том, чтобы синхронизировать потоки и ограничить доступ к разделяемым ресурсам на время их использования каким-нибудь потоком. Для этого используется ключевое слово lock . Оператор lock определяет блок кода, внутри которого весь код блокируется и становится недоступным для других потоков до завершения работы текущего потока. И мы можем переделать предыдущий пример следующим образом:

Class Program { static int x=0; static object locker = new object(); static void Main(string args) { for (int i = 0; i < 5; i++) { Thread myThread = new Thread(Count); myThread.Name = "Поток " + i.ToString(); myThread.Start(); } Console.ReadLine(); } public static void Count() { lock (locker) { x = 1; for (int i = 1; i < 9; i++) { Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x); x++; Thread.Sleep(100); } } } }

Для блокировки с ключевым словом lock используется объект-заглушка, в данном случае это переменная locker . Когда выполнение доходит до оператора lock, объект locker блокируется, и на время его блокировки монопольный доступ к блоку кода имеет только один поток. После окончания работы блока кода, объект locker освобождается и становится доступным для других потоков.

Loading...Loading...