Example: Using Queues in a Multistage Pipeline
The boss/worker model, along with its variations, is one popular multithreaded programming model, and Program 8-2 is a simple producer/consumer model, a special case of the more general pipeline model.Another important special case consists of a single boss thread that produces work items for a limited number of worker threads, placing the work items in a queue. This technique can be helpful when creating a scalable server that has a large number (perhaps thousands) of clients and it is not feasible to have a worker thread for each client. Chapter 14 discusses the scalable server problem in the context of I/O completion ports.In the pipeline model, each thread, or group of threads, does some work on work items, such as messages, and passes the work items on to other threads for additional processing. A manufacturing assembly line is analogous to a thread pipeline. Queues are an ideal mechanism for pipeline implementations.Program 8-2, except that each message has a destination field indicating which consumer thread is to receive the message; each producer communicates with a single consumer. The number of producer/consumer pairs is a command line parameter. The producer then sends the unit message to the transmitter thread by placing the message in the transmission queue. If the queue is full, the producer waits until the queue state changes.
Figure 10-1 shows the system. Notice how it models networking communication where messages between several sender/receiver pairs are combined and transmitted over a shared facility.
Figure 10-1. Multistage Pipeline

Program 10-5. ThreeStage.c: A Multistage Pipeline
/* Chapter 10. ThreeStage.c */
/* Three-stage producer/consumer system. */
/* Usage: ThreeStage npc goal. */
/* Start up "npc" paired producer and consumer threads. */
/* Each producer must produce a total of */
/* "goal" messages, where each message is tagged */
/* with the consumer that should receive it. */
/* Messages are sent to a "transmitter thread," which performs */
/* additional processing before sending message groups to the */
/* "receiver thread." Finally, the receiver thread sends */
/* the messages to the consumer threads. */
#include "EvryThng.h"
#include "SynchObj.h"
#include "messages.h"
#include <time.h>
#define DELAY_COUNT 1000
#define MAX_THREADS 1024
/* Q lengths and blocking factors. These are arbitrary and */
/* can be adjusted for performance tuning. The current values are */
/* not well balanced. */
#define TBLOCK_SIZE 5 /* Trsmttr combines 5 messages at a time. */
#define TBLOCK_TIMEOUT 50 /* Trsmttr time-out waiting for messages. */
#define P2T_QLEN 10 /* Producer to transmitter queue length. */
#define T2R_QLEN 4 /* Transmitter to receiver queue length. */
#define R2C_QLEN 4 /* Receiver to consumer queue length --
there is one such queue for each consumer. */
DWORD WINAPI producer (PVOID);
DWORD WINAPI consumer (PVOID);
DWORD WINAPI transmitter (PVOID);
DWORD WINAPI receiver (PVOID);
typedef struct _THARG {
volatile DWORD thread_number;
volatile DWORD work_goal; /* Used by producers. */
volatile DWORD work_done; /* Used by producers & consumers. */
char future [8];
} THARG;
/* Grouped messages sent by the transmitter to receiver. */
typedef struct t2r_msg_tag {
volatile DWORD num_msgs; /* Number of messages contained. */
msg_block_t messages [TBLOCK_SIZE];
} t2r_msg_t;
queue_t p2tq, t2rq, *r2cq_array;
static volatile DWORD ShutDown = 0;
static DWORD EventTimeout = 50;
DWORD _tmain (DWORD argc, LPTSTR * argv [])
{
DWORD tstatus, nthread, ithread, goal, thid;
HANDLE *producer_th, *consumer_th, transmitter_th, receiver_th;
THARG *producer_arg, *consumer_arg;
nthread = atoi (argv [1]);
goal = atoi (argv [2]);
producer_th = malloc (nthread * sizeof (HANDLE));
producer_arg = calloc (nthread, sizeof (THARG));
consumer_th = malloc (nthread * sizeof (HANDLE));
consumer_arg = calloc (nthread, sizeof (THARG));
q_initialize (&p2tq, sizeof (msg_block_t), P2T_QLEN);
q_initialize (&t2rq, sizeof (t2r_msg_t), T2R_QLEN);
/* Allocate, initialize Rec-Cons queue for each consumer. */
r2cq_array = calloc (nthread, sizeof (queue_t));
for (ithread = 0; ithread < nthread; ithread++) {
/* Initialize r2c queue for this consumer thread. */
q_initialize (&r2cq_array [ithread], sizeof (msg_block_t),
R2C_QLEN);
/* Fill in the thread arg. */
consumer_arg [ithread].thread_number = ithread;
consumer_arg [ithread].work_goal = goal;
consumer_arg [ithread].work_done = 0;
consumer_th [ithread] = (HANDLE)_beginthreadex (NULL, 0,
consumer, (PVOID) &consumer_arg [ithread], 0, &thid);
producer_arg [ithread].thread_number = ithread;
producer_arg [ithread].work_goal = goal;
producer_arg [ithread].work_done = 0;
producer_th [ithread] = (HANDLE) _beginthreadex (NULL, 0,
producer, (PVOID) &producer_arg [ithread], 0, &thid);
}
transmitter_th = (HANDLE) _beginthreadex (NULL, 0,
transmitter, NULL, 0, &thid);
receiver_th = (HANDLE) _beginthreadex (NULL, 0,
receiver, NULL, 0, &thid);
_tprintf _T ("BOSS: All threads are running\n");
/* Wait for the producers to complete. */
for (ithread = 0; ithread < nthread; ithread++) {
WaitForSingleObject (producer_th [ithread], INFINITE);
_tprintf _T ("BOSS: Producer %d produced %d work units\n",
ithread, producer_arg [ithread].work_done);
}
/* Producers have completed their work. */
_tprintf _T ("BOSS: All producers have completed their work.\n");
/* Wait for the consumers to complete. */
for (ithread = 0; ithread < nthread; ithread++) {
WaitForSingleObject (consumer_th [ithread], INFINITE);
_tprintf _T ("BOSS: consumer %d consumed %d work units\n",
ithread, consumer_arg [ithread].work_done);
}
_tprintf _T ("BOSS: All consumers have completed their work.\n");
ShutDown = 1; /* Set a shutdown flag. */
/* Terminate, and wait for, the transmitter and receiver. */
/* This thread termination is OK as the transmitter and */
/* receiver cannot hold any resources other than a mutex, */
/* which will be abandoned. Can you do this a better way? */
TerminateThread (transmitter_th, 0);
TerminateThread (receiver_th, 0);
WaitForSingleObject (transmitter_th, INFINITE);
WaitForSingleObject (receiver_th, INFINITE);
q_destroy (&p2tq);
q_destroy (&t2rq);
for (ithread = 0; ithread < nthread; ithread++)
q_destroy (&r2cq_array [ithread]);
free (r2cq_array);
free (producer_th); free (consumer_th);
free (producer_arg); free (consumer_arg);
_tprintf _T ("System has finished. Shutting down\n");
return 0;
}
DWORD WINAPI producer (PVOID arg)
{
THARG * parg;
DWORD ithread, tstatus;
msg_block_t msg;
parg = (THARG *) arg;
ithread = parg->thread_number;
while (parg->work_done < parg->work_goal) {
/* Produce work units until the goal is satisfied. */
/* Messages receive source, destination address, which are */
/* the same here but could, in general, be different. */
delay_cpu (DELAY_COUNT * rand () / RAND_MAX);
message_fill (&msg, ithread, ithread, parg->work_done);
/* Put the message in the queue. */
tstatus = q_put (&p2tq, &msg, sizeof (msg), INFINITE);
parg->work_done++;
}
return 0;
}
DWORD WINAPI transmitter (PVOID arg)
{
/* Obtain multiple producer messages, combining into a single */
/* compound message for the receiver. */
DWORD tstatus, im;
t2r_msg_t t2r_msg = {0};
msg_block_t p2t_msg;
while (!ShutDown) {
t2r_msg.num_msgs = 0;
/* Pack the messages for transmission to the receiver. */
for (im = 0; im < TBLOCK_SIZE; im++) {
tstatus = q_get (&p2tq, &p2t_msg,
sizeof (p2t_msg), INFINITE);
if (tstatus != 0) break;
memcpy (&t2r_msg.messages [im], &p2t_msg, sizeof (p2t_msg));
t2r_msg.num_msgs++;
}
tstatus = q_put (&t2rq, &t2r_msg, sizeof (t2r_msg), INFINITE);
if (tstatus != 0) return tstatus;
}
return 0;
}
DWORD WINAPI receiver (PVOID arg)
{
/* Obtain compound messages from the transmitter; unblock */
/* them and transmit to the designated consumer. */
DWORD tstatus, im, ic;
t2r_msg_t t2r_msg;
msg_block_t r2c_msg;
while (!ShutDown) {
tstatus = q_get (&t2rq, &t2r_msg, sizeof (t2r_msg), INFINITE);
if (tstatus != 0) return tstatus;
/* Distribute the messages to the proper consumer. */
for (im = 0; im < t2r_msg.num_msgs; im++) {
memcpy (&r2c_msg, &t2r_msg.messages [im], sizeof (r2c_msg));
ic = r2c_msg.destination; /* Destination consumer. */
tstatus = q_put (&r2cq_array [ic], &r2c_msg,
sizeof (r2c_msg), INFINITE);
if (tstatus != 0) return tstatus;
}
}
return 0;
}
DWORD WINAPI consumer (PVOID arg)
{
THARG * carg;
DWORD tstatus, ithread;
msg_block_t msg;
queue_t *pr2cq;
carg = (THARG *) arg;
ithread = carg->thread_number;
carg = (THARG *) arg;
pr2cq = &r2cq_array [ithread];
while (carg->work_done < carg->work_goal) {
/* Receive and display (optionally -- not shown) messages. */
tstatus = q_get (pr2cq, &msg, sizeof (msg), INFINITE);
if (tstatus != 0) return tstatus;
carg->work_done++;
}
return 0;
}
Comments on the Multistage Pipeline
There are several things to notice about this implementation, some of which are mentioned in the program comments. Exercises 106, 107, and 1010 suggest addressing these issues.
- A significant objection could be the way that the main thread terminates the transmitter and receiver threads. A solution would be to use a time-out value in the inner transmitter and receiver loops and shut down when the global shutdown flag is detected. Another approach would be to cancel the threads, as described later in this chapter.
- Note the symmetry between the transmitter and receiver threads. As with the queue implementation, this facilitates program design, debugging, and maintenance.
- The implementation is not well balanced in terms of the match of the message production rates, the pipeline sizes, and the transmitter-receiver blocking factor.
- This implementation (Program 10-4) uses mutexes to guard the queues. Experiments with CRITICAL_SECTIONs show no significant speed-up on a single-processor system (see Appendix C). The CS version is included on the Web site as ThreeStageCS.c. SignalObjectAndWait provides similar performance improvements.
