Example: A Simple Producer/Consumer System
Program 8-1 shows how CS objects can be useful. The program also shows how to build protected data structures for storing object state and introduces the concept of an invariant, which is a property of an object's state that is guaranteed (by the proper program implementation) to be true outside a critical code section. Here is a description of the problem.
- There are two threads, a producer and a consumer, that act entirely asynchronously.
- The producer periodically creates messages containing a table of numbers, such as current stock prices, periodically updating the table.
- The consumer, on request from the user, displays the current data. The requirement is that the displayed data must be the most recent complete set of data, but no data should be displayed twice.
- Do not display data while the producer is updating it, and do not display old data. Note that many produced messages are never used and are "lost." This example is a special case of the pipeline model in which data moves from one thread to the next.
- As an integrity check, the producer also computes a simple checksum[1] of the data in the table, and the consumer validates the checksum to ensure that the data has not been corrupted in transmission from one thread to the next. If the consumer accesses the table while it is still being updated, the table will be invalid; the CS ensures that this does not happen. The message block invariant is that the checksum is correct for the current message contents.
[1] This checksum, an "exclusive or" of the message bits, is for illustration only. Much more sophisticated message digest techniques are available and should be used in a production application.
- The two threads also maintain statistics on the total number of messages produced, consumed, and lost.
Program 8-1. simplePC: A Simple Producer and Consumer
/* Chapter 8. simplePC.c */
/* Maintain two threads, a producer and a consumer. */
/* The producer periodically creates checksummed data buffers, */
/* or "message blocks," that the consumer displays when prompted. */
#include "EvryThng.h"
#include <time.h>
#define DATA_SIZE 256
typedef struct msg_block_tag { /* Message block. */
volatile DWORD f_ready, f_stop; /* Msg ready and stop flags. */
volatile DWORD sequence; /* Message block sequence number. */
volatile DWORD nCons, nLost;
time_t timestamp;
CRITICAL_SECTION mguard; /* Guard message block structure. */
DWORD checksum; /* Message contents checksum. */
DWORD data [DATA_SIZE]; /* Message contents. */
} MSG_BLOCK;
/* Single message block, ready to fill with a new message. */
MSG_BLOCK mblock = { 0, 0, 0, 0, 0 };
DWORD WINAPI produce (void *);
DWORD WINAPI consume (void *);
void MessageFill (MSG_BLOCK *);
void MessageDisplay (MSG_BLOCK *);
DWORD _tmain (DWORD argc, LPTSTR argv [])
{
DWORD Status, ThId;
HANDLE produce_h, consume_h;
/* Initialize the message block CRITICAL SECTION. */
InitializeCriticalSection (&mblock.mguard);
/* Create the two threads. */
produce_h =
(HANDLE)_beginthreadex (NULL, 0, produce, NULL, 0, &ThId);
consume_h =
(HANDLE)_beginthreadex (NULL, 0, consume, NULL, 0, &ThId);
/* Wait for the producer and consumer to complete. */
WaitForSingleObject (consume_h, INFINITE);
WaitForSingleObject (produce_h, INFINITE);
DeleteCriticalSection (&mblock.mguard);
_tprintf (_T ("Producer and consumer threads terminated\n"));
_tprintf (_T ("Produced: %d, Consumed: %d, Known Lost: %d\n"),
mblock.sequence, mblock.nCons, mblock.nLost);
return 0;
}
DWORD WINAPI produce (void *arg)
/* Producer thread -- create new messages at random intervals. */
{
srand ((DWORD) time (NULL)); /* Seed the random # generator. */
while (!mblock.f_stop) {
/* Random delay. */
Sleep (rand () / 100);
/* Get the buffer, fill it. */
EnterCriticalSection (&mblock.mguard);
__try {
if (!mblock.f_stop) {
mblock.f_ready = 0;
MessageFill (&mblock);
mblock.f_ready = 1;
mblock.sequence++;
}
}
__finally { LeaveCriticalSection (&mblock.mguard); }
}
return 0;
}
DWORD WINAPI consume (void *arg)
{
DWORD ShutDown = 0;
CHAR command, extra;
/* Consume the NEXT message when prompted by the user. */
while (!ShutDown) { /* Only thread accessing stdin, stdout. */
_tprintf (_T ("\n**Enter 'c' for consume; 's' to stop: "));
_tscanf ("%c%c", &command, &extra);
if (command == 's') {
EnterCriticalSection (&mblock.mguard);
ShutDown = mblock.f_stop = 1;
LeaveCriticalSection (&mblock.mguard);
} else if (command == 'c') { /* Get new buffer to consume. */
EnterCriticalSection (&mblock.mguard);
__try {
if (mblock.f_ready == 0)
_tprintf (_T ("No new messages. Try again.\n"));
else {
MessageDisplay (&mblock);
mblock.nCons++;
mblock.nLost = mblock.sequence - mblock.nCons;
mblock.f_ready = 0; /* No new messages. */
}
}
__finally { LeaveCriticalSection (&mblock.mguard); }
} else {
_tprintf (_T ("Illegal command. Try again.\n"));
}
}
return 0;
}
void MessageFill (MSG_BLOCK *mblock)
{
/* Fill the message buffer, including checksum and timestamp. */
DWORD i;
mblock->checksum = 0;
for (i = 0; i < DATA_SIZE; i++) {
mblock->data [i] = rand ();
mblock->checksum ^= mblock->data [i];
}
mblock->timestamp = time (NULL);
return;
}
void MessageDisplay (MSG_BLOCK *mblock)
{
/* Display message buffer, timestamp, and validate checksum. */
DWORD i, tcheck = 0;
for (i = 0; i < DATA_SIZE; i++)
tcheck ^= mblock->data [i];
_tprintf (_T ("\nMessage number %d generated at: %s"),
mblock->sequence, _tctime (&(mblock->timestamp)));
_tprintf (_T ("First and last entries: %x %x\n"),
mblock->data [0], mblock->data [DATA_SIZE - 1]);
if (tcheck == mblock->checksum)
_tprintf (_T ("GOOD ->Checksum was validated.\n"));
else
_tprintf (_T ("BAD ->Checksum failed. Message corrupted.\n"));
return;
}
Comments on the Simple Producer/Consumer Example
This example illustrates several points and programming conventions that will be important throughout this and the following chapters.
- The CRITICAL_SECTION object is a part of the object (the message block) that it protects.
- Every access to the message block is performed in a critical code section.
- The variables that are accessed by the different threads are volatile.
- Termination handlers are used to ensure that the CS is released. This technique, while not essential, helps to ensure that later code modifications do not inadvertently skip the LeaveCriticalSection call. Also, the termination handler is limited to C and should not be used with C++.
- The MessageFill and MessageDisplay functions are called only within critical code sections, and both functions use local, rather than global, storage for their computations. Incidentally, these two functions will be used in subsequent examples but will not be listed again.
- The producer does not have a useful way to tell the consumer that there is a new message, so the consumer simply has to wait until the ready flag, indicating a new message, is set. Event kernel objects will give us a way to eliminate this inefficiency.
- One of the invariant properties that this program ensures is that the message block checksum is always correct, outside the critical code sections. Another invariant property is:
There will be more about this important concept later.
0 <= nLost + nCons <= sequence - The producer thread only knows that it should stop by examining a flag in the message block, where the flag is set by the consumer. Because one thread cannot send any sort of signal to another and TerminateThread has undesirable side effects, this technique is the simplest way to stop another thread. Of course, the threads must cooperate for this method to be effective. This solution requires, however, that the thread must not be blocked so that it can test the flag; Chapter 10 shows how to cancel a blocked thread.
The CRITICAL_SECTION object is a powerful synchronization mechanism, yet it does not provide all the functionality needed. The inability to signal another thread was noted earlier, and there is also no time-out capability. The Windows kernel synchronization objects address these limitations and more.
