Example: A Producer/Consumer System
This example extends Program 8-1 so that the consumer can wait until there is an available message. This eliminates the problem that requires the consumer to try again if a new message is not available. The resulting program, Program 8-2, is called eventPC.Notice that the solution uses a mutex rather than a CRITICAL_SECTION; there is no reason for this other than to illustrate mutex usage. The use of an auto-reset event and SetEvent in the producer are, however, essential for correct operation to ensure that just one thread is released.Also notice how the mutex and event are both associated with the message block data structure. The mutex enforces the critical code section for accessing the data structure object, and the event is used to signal the fact that there is a new message. Generalizing, the mutex ensures the object's invariants, and the event signals that the object is in a specified state. This basic technique is used extensively in later chapters.
Program 8-2. eventPC: A Signaling Producer and Consumer
Note:
/* Chapter 8. eventPC.c */
/* Maintain two threads, a producer and a consumer. */
/* The producer periodically creates checksummed data buffers, */
/* or "message blocks," signaling the consumer that a message */
/* is ready. The consumer displays when prompted. */
#include "EvryThng.h"
#include <time.h>
#define DATA_SIZE 256
typedef struct msg_block_tag { /* Message block. */
volatile DWORD f_ready, f_stop;
/* Ready state flag, stop flag. */
volatile DWORD sequence; /* Message block sequence number. */
volatile DWORD nCons, nLost;
time_t timestamp;
HANDLE mguard; /* Mutex to guard the message block structure. */
HANDLE mready; /* "Message ready" event. */
DWORD checksum; /* Message contents checksum. */
DWORD data [DATA_SIZE]; /* Message contents. */
} MSG_BLOCK;
/* ... */
DWORD _tmain (DWORD argc, LPTSTR argv [])
{
DWORD Status, ThId;
HANDLE produce_h, consume_h;
/* Initialize the message block mutex and event (auto-reset). */
mblock.mguard = CreateMutex (NULL, FALSE, NULL);
mblock.mready = CreateEvent (NULL, FALSE, FALSE, NULL);
/* Create producer and consumer; wait until they terminate. */
/* ... As in Program 91 ... */
CloseHandle (mblock.mguard);
CloseHandle (mblock.mready);
_tprintf (_T ("Producer and consumer threads terminated\n"));
_tprintf (_T ("Produced: %d, Consumed: %d, Known Lost: %d\n"),
mblock.sequence, mblock.nCons, mblock.nLost);
return 0;
}
DWORD WINAPI produce (void *arg)
/* Producer thread -- create new messages at random intervals. */
{
srand ((DWORD)time(NULL)); /* Seed the random # generator. */
while (!mblock.f_stop) {
/* Random delay. */
Sleep (rand () / 10); /* Wait a long period for next message. */
/* Get the buffer, fill it. */
WaitForSingleObject (mblock.mguard, INFINITE);
__try {
if (!mblock.f_stop) {
mblock.f_ready = 0;
MessageFill (&mblock);
mblock.f_ready = 1;
mblock.sequence++;
SetEvent(mblock.mready); /* Signal "message ready." */
}
}
__finally { ReleaseMutex (mblock.mguard); }
}
return 0;
}
DWORD WINAPI consume (void *arg)
{
DWORD ShutDown = 0;
CHAR command, extra;
/* Consume the NEXT message when prompted by the user. */
while (!ShutDown) { /* Only thread accessing stdin, stdout. */
_tprintf (_T ("\n**Enter 'c' for consume; 's' to stop: "));
_tscanf ("%c%c", &command, &extra);
if (command == 's') {
WaitForSingleObject (mblock.mguard, INFINITE);
ShutDown = mblock.f_stop = 1;
ReleaseMutex (mblock.mguard);
} else if (command == 'c') { /* Get new buffer to consume. */
WaitForSingleObject (mblock.mready, INFINITE);
WaitForSingleObject (mblock.mguard, INFINITE);
__try {
if (!mblock.f_ready) _leave;
/* Wait for the event indicating a message is ready. */
MessageDisplay (&mblock);
mblock.nCons++;
mblock.nLost = mblock.sequence - mblock.nCons;
mblock.f_ready = 0; /* No new messages are ready. */
}
__finally { ReleaseMutex (mblock.mguard); }
} else {
_tprintf (_T ("Illegal command. Try again.\n"));
}
}
return 0;
}
There is a possibility that the consumer, having received the message ready event, will not actually process the current message if the producer generates yet another message before the consumer acquires the mutex. This behavior could cause a consumer to process a single message twice if it were not for the test at the start of the consumer's __try block. This and similar issues will be addressed in Chapter 10.
Review: Windows Synchronization Objects
Table 8-2 reviews and compares the essential features of the Windows synchronization objects.
CRITICAL_SECTION | Mutex | Semaphore | Event | |
---|---|---|---|---|
Named, Securable Synchronization Object | No | Yes | Yes | Yes |
Accessible from Multiple Processes | No | Yes | Yes | Yes |
Synchronization | Enter | Wait | Wait | Wait |
Release | Leave | Release or abandoned | Any thread can release. | Set, pulse |
Ownership | One thread at a time. The owning thread can enter multiple times without blocking. | One thread at a time. The owning thread can wait multiple times without blocking. | N/A. Many threads at a time, up to the maximum count. | N/A. Any thread can set or pulse an event. |
Effect of Release | One waiting thread can enter. | One waiting thread can gain ownership after last release. | Multiple threads can proceed, depending on release count. | One or several waiting threads will proceed after a set or pulse. |
Message and Object Waiting
The function MsgWaitForMultipleObjects is similar to WaitForMultipleObjects. Use this function to allow a thread to process user interface events, such as mouse clicks, while waiting on synchronization objects.
