[ home ] [ math / cs / ai / phy / as / chem / bio / geo ] [ civ / aero / mech / ee / hdl / os / dev / web / app / sys / net / sec ] [ med / fin / psy / soc / his / lit / lin / phi / arch ] [ off / vg / jp / 2hu / tc / ts / adv / hr / meta / tex ] [ chat ] [ wiki ]

Viewing source code

The following is the source code for post >>>/os/5

\textbf{Producers and Consumers}
Assume there are \[N\] producer threads, \[M\] consumer threads, and a shared rounded buffer of size \[S\] (pic related). Producers always put data in the shared buffer, while consumers always take data from the shared buffer.

If producers are generate data too quickly, it could cause a buffer overflow. Similarly, if consumers fetch data too quickly, the buffer will empty out and they would start fetching invalid or obsolete data. In addition, the buffer has to be protected with a mutex lock, otherwise producers would be competing for the same slot, and consumers could fetch the same data.

The latter problem is solved by surrounding buffer operations with a mutex lock/unlock. This makes the buffer MT-Safe. The former problem is be solved in similar way A-B synchronization was solved >>3 ("abababababababab..."). Producers and consumers will block themselves if the buffer is full or empty respectively, but they will also signal each other each time they put in or take out data from the buffer 

\textbf{Implementation}

POSIX Semaphores used:
```
sem_t mutex, full, empty;
int main() {
    sem_init(&mutex, 0, 1);
    sem_init(&full,  0, BUFFER_SIZE);
    sem_init(&empty, 0, 0);
    ...
    sem_destroy(&mutex);
    sem_destroy(&full);
    sem_destroy(&empty);
}
```

Producer threads:
```
void* producer(void* data)
{
    for ( ; ; ) {
        auto data = PRODUCE_DATA();
        sem_wait(&full);
        sem_wait(&mutex);
        BUFFER_PUT(buffer, data);
        assert(sem_post(&mutex) == 0);
        assert(sem_post(&empty) == 0);
    }
    return NULL;
}
```

Consumer threads:
```
void* consumer(void* data)
{
    for ( ; ; ) {
        sem_wait(&full);
        sem_wait(&mutex);
        auto data = BUFFER_GET(buffer);
        assert(sem_post(&mutex) == 0);
        assert(sem_post(&empty) == 0);
        CONSUME_DATA(data);
    }
    return NULL;
}
```

Full code:
```
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <assert.h>
#include <string.h>
#include <stdlib.h>

#define RED(s) ("\e[7m\e[31m" s "\e[0m")
#define GREEN(s) ("\e[32m" s "\e[0m")
#define BUFFER_SIZE 10
#define PRODUCER_COUNT 10
#define CONSUMER_COUNT 10

int buffer[BUFFER_SIZE], head = 0, tail = 0, count = 0;
sem_t mutex, empty, full;
void* producer(void* data)
{
    for ( ; ; ) {
        /* produce data */
        int x = rand() % 1000;

        /* put into buffer (thread-safe) */
        assert(sem_wait(&full) == 0);
        assert(sem_wait(&mutex) == 0);
        buffer[head] = x;
        head = (head + 1) % BUFFER_SIZE;
        count += 1;
        assert(sem_post(&mutex) == 0);
        assert(sem_post(&empty) == 0);
    }
    return NULL;
}

void* consumer(void* data)
{
    for ( ; ; ) {
        /* get data from buffer (thread-safe) */
        assert(sem_wait(&empty) == 0);
        assert(sem_wait(&mutex) == 0);
        int x = buffer[tail];
        tail = (tail + 1) % BUFFER_SIZE;
        count -= 1;
        assert(sem_post(&mutex) == 0);
        assert(sem_post(&full) == 0);

        /* consume data */
        printf("%d ", x);
    }
    return NULL;
}

int main()
{
    assert(sem_init(&mutex, 0, 1) == 0);
    assert(sem_init(&full,  0, BUFFER_SIZE) == 0);
    assert(sem_init(&empty, 0, 0) == 0);

    pthread_t tc[CONSUMER_COUNT], tp[PRODUCER_COUNT];
    int i;
    for (i = 0; i < CONSUMER_COUNT; i++)
        assert(pthread_create(&tc[i], NULL, consumer, NULL) == 0);
    for (i = 0; i < PRODUCER_COUNT; i++)
        assert(pthread_create(&tp[i], NULL, producer, NULL) == 0);
    for (i = 0; i < CONSUMER_COUNT; i++)
        assert(pthread_join(tc[i], NULL) == 0);
    for (i = 0; i < PRODUCER_COUNT; i++)
        assert(pthread_join(tp[i], NULL) == 0);

    assert(sem_destroy(&mutex) == 0);
    assert(sem_destroy(&empty) == 0);
    assert(sem_destroy(&full) == 0);
    return 0;
}```
Compile with \`gcc -pthread main.c\` and run with \`./a.out\`, interrupt with \`CTRL+C\`.