A Queue Object
So far, we have associated a single event with each mutex, but in general there might be more than one condition variable predicate. For example, in implementing a first in, first out (FIFO) queue, a thread that removes an element from the queue needs to wait on an event signifying that the queue is not empty, while a thread placing an element in the queue must wait until the queue is not full. The solution is to provide two events, one for each condition.Exercise 107 suggests making the conversion to Windows style.10-5 show the queue functions and a program that uses them.
Program 10-3. SynchObj.h: Part 2Queue Definitions
Exercise 108 suggests doing this.
/* Definitions of synchronized, general bounded queue structure. */
/* Queues are implemented as arrays with indices to youngest */
/* and oldest messages, with wrap around. */
/* Each queue also contains a guard mutex and */
/* "not empty" and "not full" condition variables. */
/* Finally, there is a pointer to an array of messages of */
/* arbitrary type. */
typedef struct queue_tag { /* General-purpose queue. */
HANDLE q_guard; /* Guard the message block. */
HANDLE q_ne; /* Queue is not empty. MR event
(AR for "signal model"). */
HANDLE q_nf; /* Queue is not full. MR event.
(AR for "signal model"). */
volatile DWORD q_size; /* Queue max size. */
volatile DWORD q_first; /* Index of oldest message. */
volatile DWORD q_last; /* Index of youngest message. */
volatile DWORD q_destroyed; /* Q receiver has terminated. */
PVOID msg_array; /* Array of q_size messages. */
} queue_t;
/* Queue management functions. */
DWORD q_initialize (queue_t *, DWORD, DWORD);
DWORD q_destroy (queue_t *);
DWORD q_destroyed (queue_t *);
DWORD q_empty (queue_t *);
DWORD q_full (queue_t *);
DWORD q_get (queue_t *, PVOID, DWORD, DWORD);
DWORD q_put (queue_t *, PVOID, DWORD, DWORD);
DWORD q_remove (queue_t *, PVOID, DWORD);
DWORD q_insert (queue_t *, PVOID, DWORD);
Program 10-4. QueueObj.c: The Queue Management Functions
/* Chapter 10. QueueObj.c. */
/* Queue function */
#include "EvryThng.h"
#include "SynchObj.h"
/* Finite bounded queue management functions. */
DWORD q_get (queue_t *q, PVOID msg, DWORD msize, DWORD MaxWait)
{
if (q_destroyed (q)) return 1;
WaitForSingleObject (q->q_guard, INFINITE);
while (q_empty (q) {
SignalObjectAndWait (q->q_guard, q->q_ne, INFINITE, FALSE);
WaitForSingleObject (q->q_guard, INFINITE);
}
/* Remove the message from the queue. */
q_remove (q, msg, msize);
/* Signal that queue is not full as we've removed a message. */
PulseEvent (q->q_nf);
ReleaseMutex (q->q_guard);
return 0;
}
DWORD q_put (queue_t *q, PVOID msg, DWORD msize, DWORD MaxWait)
{
if (q_destroyed (q)) return 1;
WaitForSingleObject (q->q_guard, INFINITE);
while (q_full (q) {
SignalObjectAndWait (q->q_guard, q->q_nf, INFINITE, FALSE);
WaitForSingleObject (q->q_guard, INFINITE);
}
/* Put the message in the queue. */
q_insert (q, msg, msize);
/* Signal that queue is not empty; we've inserted a message. */
PulseEvent (q->q_ne); /* Broadcast CV model. */
ReleaseMutex (q->q_guard);
return 0;
}
DWORD q_initialize (queue_t *q, DWORD msize, DWORD nmsgs)
{
/* Initialize queue, including its mutex and events. */
/* Allocate storage for all messages. */
q->q_first = q->q_last = 0;
q->q_size = nmsgs;
q->q_destroyed = 0;
q->q_guard = CreateMutex (NULL, FALSE, NULL);
q->q_ne = CreateEvent (NULL, TRUE, FALSE, NULL);
q->q_nf = CreateEvent (NULL, TRUE, FALSE, NULL);
if ((q->msg_array = calloc (nmsgs, msize)) == NULL) return 1;
return 0; /* No error. */
}
DWORD q_destroy (queue_t *q)
{
if (q_destroyed (q)) return 1;
/* Free all the resources created by q_initialize. */
WaitForSingleObject (q->q_guard, INFINITE);
q->q_destroyed = 1;
free (q->msg_array);
CloseHandle (q->q_ne);
CloseHandle (q->q_nf);
ReleaseMutex (q->q_guard);
CloseHandle (q->q_guard);
return 0;
}
DWORD q_destroyed (queue_t *q)
{
return (q->q_destroyed);
}
DWORD q_empty (queue_t *q)
{
return (q->q_first == q->q_last);
}
DWORD q_full (queue_t *q)
{
return ((q->q_last - q->q_first) == 1 ||
(q->q_first == q->q_size-1 && q->q_last == 0));
}
DWORD q_remove (queue_t *q, PVOID msg, DWORD msize)
{
char *pm;
pm = (char *)q->msg_array;
/* Remove oldest ("first") message. */
memcpy (msg, pm + (q->q_first * msize), msize);
q->q_first = ((q->q_first + 1) % q->q_size);
return 0; /* No error. */
}
DWORD q_insert (queue_t *q, PVOID msg, DWORD msize)
{
char *pm;
pm = (char *) q->msg_array;
/* Add a new youngest ("last") message. */
if (q_full (q)) return 1; /* Error - Q is full. */
memcpy (pm + (q->q_last * msize), msg, msize);
q->q_last = ((q->q_last + 1) % q->q_size);
return 0;
}
Comments on the Queue Management Functions and Performance
Appendix C contains performance data, based on Program 10-5, which uses the queue management functions. The following comments that refer to performance are based on that data. The book's Web site contains code for all the implementation variations.
- This implementation uses the broadcast model (manual-reset/PulseEvent) to allow for the general case in which multiple messages may be requested or created by a single thread. The signal model (auto-reset/SetEvent) will work if this generality is not required, and there are significant performance advantages because only a single thread is released to test the predicate. The Web site contains QueueObj_Sig.c, a source file that uses signaling rather than broadcasting.
- Using a CRITICAL_SECTION, rather than a mutex, to protect the queue object can also improve performance. However, you must use an EnterCriticalSection followed by an event wait rather than SignalObjectAndWait. Two files provided on the Web site, QueueObjCS.c and QueueObjCS_Sig.c, illustrate this alternative approach.
- QueueObj_noSOAW.c and QueueObj_noSOAW.c are two additional source files provided on the Web site that build executable programs that will run on Windows 9x and do not use SignalObjectAndWait.
- Appendix C also shows the nonlinear performance impact when a large number of threads contend for a queue. The Web site contains projects for each of the alternative strategies; the projects are variations of the ThreeStage pipeline system described in the following sections.
- In summary, the queues can be extended to be process-sharable and to get and put multiple messages atomically. Some performance gains may be realized, however, by using the signal model, a CRITICAL_SECTION, or SignalObjectAndWait. Appendix C gives some performance results.
