[ home ] [ math / cs / ai / phy / as / chem / bio / geo ] [ civ / aero / mech / ee / hdl / os / dev / web / app / sys / net / sec ] [ med / fin / psy / soc / his / lit / lin / phi / arch ] [ off / vg / jp / 2hu / tc / ts / adv / hr / meta / tex ] [ chat ] [ wiki ]

Viewing source code

The following is the source code for post >>>/os/7

\textbf{Barrier synchronization}
Barriers are used accumulate a certain number of threads on the barrier letting them continue. 

Assume there are 5 threads downloading video clips from the internet for the purpose of splicing them together. Some threads will finish quicker than others depending on download speed and the size of the clip. In any case, every downloader thread has to wait for other four to finish downloading before splicing can happen.

Barriers are natively implemented by pthreads
```
static int count = 5;
pthread_barrier_t bar;

void *thread(void *data) {
    for ( ; ; ) {
        ...
        pthread_barrier_wait(&bar);
        ...
    }
}

int main() {
    ...
    pthread_barrier_init(&bar, NULL, count);
    ...
    pthread_barrier_destroy(&bar);
}
```

Implementation using POSIX semaphores involves simply counting the number of threads blocked on the barrier and flushing all of them after a certain number is reached. The counter variable has to be protected with a mutex of course.

\textbf{Implementation}
bar.h
```
#ifndef __BARRIER_H__
#define __BARRIER_H__
#include <semaphore.h>
#include <stdlib.h>
struct barrier_t {
    sem_t b_mutex;
    sem_t b_barrier;
    int b_required;
    int b_count;
    void* b_retval;
};

void bar_init(struct barrier_t *bar, int required);
void bar_wait(struct barrier_t *bar);
void bar_destroy(struct barrier_t *bar);

#endif
```

bar.c
```
#include "bar.h"

void bar_init(struct barrier_t *bar, int required)
{
    sem_init(&bar->b_mutex, 0, 1);
    sem_init(&bar->b_barrier, 0, 0);
    bar->b_required = required;
    bar->b_count = 0;
    bar->b_retval = NULL;
}

void bar_wait(struct barrier_t *bar)
{
    sem_wait(&bar->b_mutex);
    bar->b_count += 1;
    if (bar->b_count == bar->b_required) {
        bar->b_count = 0;
        int i;
        for (i = 0; i < bar->b_required; i++)
            sem_post(&bar->b_barrier);
    }
    sem_post(&bar->b_mutex);
    sem_wait(&bar->b_barrier);
}

void bar_destroy(struct barrier_t *bar)
{
    sem_destroy(&bar->b_mutex);
    sem_destroy(&bar->b_barrier);
}
```

main.c
```
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "bar.h"

#define B 4
#define N 12

#define REV "\e[7m"
#define COLOR(c) "\e[" c "m"
#define NONE "\e[0m"
#define RED  31

struct barrier_t bar[B];

void *thread_func(void *data)
{
    int *id = (int*)(data);
    const char *fmt = REV COLOR("%d") " %d " NONE; 
    int iter = 0, prev = 0;
    for ( ; ; ) {
        prev = iter;
        iter = (iter + 1) % B;
        bar_wait(&bar[iter]); /* try commenting out this line ;) */
        printf(fmt, RED + iter, *id);
        usleep(50);
    }
    return (void*)id;
} 

int main()
{
    pthread_t thread[N];
    int i;
    for (i = 0; i < B; i++)
        bar_init(&bar[i], N);
    for (i = 0; i < N; i++) {
        int *id = calloc(1, sizeof(int));
        *id = i;
        pthread_create(&thread[i], NULL, thread_func, (void*)id);
    }
    for (int i = 0; i < N; i++) {
        int *id = NULL;
        pthread_join(thread[i], (void**)&id);
        free(id);
    }
    for (i = 0; i < B; i++)
        bar_destroy(&bar[i]);
    return 0;
}
```

Compile using \`gcc -pthread main.c bar.c\`, run with \`./a.out\`. Interrupt with \`CTRL + C\`.