#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#define SIXTY_SECONDS 60000000
#define ONE_SECOND 1000000
#define RANGE 10
#define PERIOD 2
typedef struct {
int *carpark;
int capacity;
int occupied;
int nextin;
int nextout;
int cars_in;
int cars_out;
pthread_mutex_t
lock;
pthread_cond_t
space;
pthread_cond_t
car;
pthread_barrier_t
bar;
} cp_t;
/* Our producer threads will each execute this function
*/
static void *
producer(void *cp_in)
{
cp_t *cp;
unsigned int
seed;
/* Convert what
was passed in to a pointer to a bounded buffer */
cp = (cp_t
*)cp_in;
/* Loop */
while (1) {
/* Sleep for up
to 1s */
usleep(rand_r(&seed) % ONE_SECOND);
/* Acquire the lock */
pthread_mutex_lock(&cp->lock);
/* While full
wait until there is room available */
while
(cp->occupied == cp->capacity) {
pthread_cond_wait(&cp->car, &cp->lock);
}
/* Insert an
item */
cp->carpark[cp->nextin] = rand_r(&seed) % RANGE;
/* Increment
counters */
cp->occupied++;
cp->nextin++;
cp->nextin
%= cp->capacity;
cp->cars_in++;
/* Someone may
be waiting on data to become available */
pthread_cond_signal(&cp->space);
/* Release the
lock */
pthread_mutex_unlock(&cp->lock);
}
return ((void
*)NULL);
}
/* Our consumer threads will each execute this function
*/
static void *
consumer(void *cp_in)
{
cp_t *cp;
unsigned int
seed;
/* Convert what
was passed in to a pointer to a bounded buffer */
cp = (cp_t
*)cp_in;
while (1) {
/* Sleep for up
to 1s */
usleep(rand_r(&seed) % ONE_SECOND);
/* Acquire the
lock */
pthread_mutex_lock(&cp->lock);
/* While empty
wait until there is data available */
while
(cp->occupied == 0) {
pthread_cond_wait(&cp->space, &cp->lock);
}
/* Increment
counters */
cp->occupied--;
cp->nextout++;
cp->nextout
%= cp->capacity;
cp->cars_out++;
/* Someone may
be waiting on room to become available */
pthread_cond_signal(&cp->car);
/* Release the
lock */
pthread_mutex_unlock(&cp->lock);
}
return ((void
*)NULL);
}
/* Our monitor thread will each execute this function */
static void *
monitor(void *cp_in)
{
cp_t *cp;
/* Convert what
was passed in to a pointer to a bounded buffer */
cp = (cp_t
*)cp_in;
while (1) {
/* Pause */
sleep(PERIOD);
/* Acquire the
lock */
pthread_mutex_lock(&cp->lock);
printf("Delta: %d\n", cp->cars_in - cp->cars_out);
/* Release the
lock */
pthread_mutex_unlock(&cp->lock);
}
return ((void
*)NULL);
}
/* Initialisation */
static int
init(cp_t *cp, int capacity)
{
/* Set up the
bounded buffer internals */
cp->occupied =
cp->nextin = cp->nextout = cp->cars_in = cp->cars_out = 0;
cp->capacity =
capacity;
/* Initialise our
data structure */
cp->carpark =
(int *)malloc(cp->capacity * sizeof (*cp->carpark));
/* Check malloc
succeeded */
if (cp->carpark
== NULL) {
perror("malloc()");
exit(EXIT_FAILURE);
}
/* Initialise
lock and condition variables */
pthread_mutex_init(&cp->lock, NULL);
pthread_cond_init(&cp->space, NULL);
pthread_cond_init(&cp->car, NULL);
/* Seed random
number generator */
srand((unsigned
int)getpid());
return (0);
}
int
main(int argc, char *argv[])
{
pthread_t p, c,
m;
cp_t cp;
/* Check usage */
if (argc != 2) {
printf("Usage: %s buffer_size\n", argv[0]);
exit(EXIT_FAILURE);
}
/* Initialise */
init(&cp,
atoi(argv[1]));
/* Create our
threads */
pthread_create(&p, NULL, producer, (void *)&cp);
pthread_create(&p, NULL, producer, (void *)&cp);
pthread_create(&c, NULL, consumer, (void *)&cp);
pthread_create(&c,
NULL, consumer, (void *)&cp);
pthread_create(&m, NULL, monitor, (void *)&cp);
/* Wait for our
threads */
pthread_join(p,
NULL);
pthread_join(p,
NULL);
pthread_join(c,
NULL);
pthread_join(c,
NULL);
pthread_join(m,
NULL);
return (0);
}
0 Comments