Generalized Producer-Consumer solution using Semaphore

The producer-consumer problem is a classical example of a multi-process synchronization problem. It illustrates the need for synchronization in systems where many processes share a resource. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer. The producer's job is to generate a piece of data, put it into the buffer and start again. At the same time the consumer is consuming the data (i.e. removing it from the buffer) one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer.

Here, the problem is generalized to have multiple producers and consumers. The solution to these type of problem must assure three constraints
  1. Mutual exclusion
  2. Free from Deadlock
  3. Free from Starvation
Pseudo-code for the solution to multiple producer- consumer problem is given below.

Semaphore fullBuffer = 0; // Initially, no item in buffer
Semaphore emptyBuffers = numBuffers; // Initially, num empty buffer
Semaphore mutex = 1; // No thread updating the buffer
Producer(item) {
emptyBuffers.P(); // Wait until space
mutex.P(); // Wait until buffer free
Enqueue(item);
mutex.V();
fullBuffers.V(); // Tell consumers there is data in buffer
}
Consumer() {
fullBuffers.P();
mutex.P();
item = Dequeue();
mutex.V();
emptyBuffers.V();
return item;
}
The mutex lock is used to ensure that only one process will be accessing the buffer at a time,this protect the Queue data structure (a shared buffer) from being damaged. If the mutex lock is not used then the concurrent process can manipulate the head and tail pointer associated with the buffer in undesired way. The emptyBuffers and fullBuffer are the semaphore specifying the no of the empty buffers and fullbuffer available respectively.

The code implementing the given pseudo-code is shown below:

/*
* main.c
*
* Created on: Dec 31, 2008
* Author: Suvash Sedhain
* Multiple producer and Multiple consumer solution using Semaphores
* Note: semaphore.P() is actually sem_wait() and semaphore.V() is sem_post()
* as defined in semaphore.h
*
* Ref:
* - Operating System Concepts : Abraham Silberschatz, Peter Baer Galvin, Greg Gagne
* - Operating Systems, William Stallings
* - Professor John D. Kubiatowicz lectures notes, University of California, Berkeley
* website: http://inst.eecs.berkeley.edu/~cs162
*
*/

#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <stdio.h>
#define BUFFER_SIZE 5
const int MAX_PRODUCER = 20;
const int MAX_CONSUMER = 20;

// information to maintain the circular queue data structure
int head = 0;
int tail = 0;

//shared buffer
int buffer[BUFFER_SIZE];
// mutex lock for buffer
pthread_mutex_t mutex;
//semaphores for specifying the empty and full
sem_t emptyBuffers;
sem_t fullBuffer;

//initialze the locks
void initialize_locks()
{
pthread_mutex_init(&mutex,NULL);
sem_init(&emptyBuffers,0,5);
sem_init(&fullBuffer,0,0);
}

// Produce random value to shared buffer
void *producer(void *param)
{
int item;
while(1)
{
item = rand();
sem_wait(&emptyBuffers);
pthread_mutex_lock(&mutex);
buffer[tail] = item ;
tail = (tail+1) % BUFFER_SIZE;
printf ("producer: inserted %d \n", item);
fflush (stdout);
pthread_mutex_unlock(&mutex);
sem_post(&fullBuffer);

}
printf ("producer quiting\n"); fflush (stdout);
}

//consume values from the shared buffer
void *consumer(void *param)
{
int item;
while (1)
{
sem_wait(&fullBuffer);
pthread_mutex_lock(&mutex);
item = buffer[head];
head = ( head + 1) % BUFFER_SIZE;
printf ("consumer: removed %d \n", item);
fflush (stdout);
pthread_mutex_unlock(&mutex);
sem_post(&emptyBuffers);
}
}

int main( int argc, char *argv[])
{
int i, sleep_time = 100, no_of_consumer_threads = 10, no_of_producer_threads = 2;
pthread_t producer_id[MAX_PRODUCER], consumer_id[MAX_CONSUMER];
initialize_locks();
// create producer threads
for(i = 0; i < no_of_producer_threads; i++)
{
if (pthread_create(&producer_id[i],NULL,producer,NULL) != 0)
{
fprintf (stderr, "Unable to create producer thread\n");
return -1;
}
}
// create consumer threads
for(i = 0; i < no_of_consumer_threads; i++)
{
if (pthread_create(&consumer_id[i],NULL,consumer,NULL) != 0)
{
fprintf (stderr, "Unable to create consumer thread\n");
}
}
sleep(sleep_time);
return 0;
}