[ home ] [ math / cs / ai / phy / as / chem / bio / geo ] [ civ / aero / mech / ee / hdl / os / dev / web / app / sys / net / sec ] [ med / fin / psy / soc / his / lit / lin / phi / arch ] [ off / vg / jp / 2hu / tc / ts / adv / hr / meta / tex ] [ chat ] [ wiki ]

/os/ - Operating Systems


Name
Email
Subject
Comment
Verification
Instructions:
  • Press the Get Captcha button to get a new captcha
  • Find the correct answer and type the key in TYPE CAPTCHA HERE
  • Press the Publish button to make a post
  • Incorrect answer to the captcha will result in an immediate ban.
File
Password (For file deletion.)

25 Dec 2021Mathchan is launched into public

9 / 7 / 4 / ?

File: Semaphore.png ( 16.56 KB , 368x207 , 1655551926593.png )

Image
What is threading?
Consider a function that contains an infinite loop:
void* a(void* data)
{
    for ( ; ; ) {
        printf("a");
    }
    return NULL;
}

Calling the function
a();
would cause the program to print "a" forever.

Now consider another function with an infinite loop:
void* b(void* data) 
{
    for ( ; ; ) {
        printf("b");
    }
    return NULL;
}

Calling the function
b();
, it would cause the program to print "b" forever.

What if we were to consecutively call
a(); b();
? Well, since
a()
contains an infinite loop, the call to
b()
is never going to happen, so all we're going to get is "aaaaaaaaaaaaaaaa". Similarly, if we were to call
b(); a();
all we're going to get is "bbbbbbbbbbbbbbbbb". Is there a way to make the program print a little bit of "a" and a little bit of "b" (i.e. "aaaabbbbbbbaaabbbb") without changing the code (i.e. with infinite loops)? This is what threads are about. By creating threads, we make it possible for multiple infinite loops to execute concurrently. Note that there is a slight difference between concurrency and parallelism. Parallel execution means two tasks are running completely simultaneously. Concurrent execution means two tasks are switching turns: task 1 is executing, then task 2, then task 1 again... If context switching is happening fast enough (the OS scheduler is the one deciding this), it would appear as if though they were running in parallel. Most synchronization techniques work the same for both so we'll make no distinction.


By default, every program is single-threaded and the only thread running is the one executing the
main()
function. We can create more threads by calling
pthread_create()
and passing the pointer to a callback function (
void* (*cb)(void*);
). If we create two threads - one for function
a()
and one for function
b()
- we will end up with three threads:

  1. Thread executing a()
  2. Thread executing b()
  3. Thread executing main()

Once threads are created they are marked as , the scheduler will grant them the opportunity to execute at some point. The
main()
thread, on the other hand, is just going to continue its execution as normal. If the main thread encounters
return 0;
the operating system is going to kill the program along with all the threads started. To prevent this from happening, we can use
pthread_join()
to tell the main thread to wait for other threads to finish. Since other threads contain infinite loops, they are never going to finish. The to thread joining was putting an infinite loop in the main function just after thread creation. Most pthread functions return 0 upon success and a value smaller than 0 upon error. We can use
assert()
as a rudimentary runtime error check.

The complete code:
// main.c
#include <stdio.h>
#include <pthread.h>
#include <assert.h>

void* a(void* data)
{
    for ( ; ; ) {
        printf("a");
    }
    return NULL;
}

void* b(void* data) 
{
    for ( ; ; ) {
        printf("b");
    }
    return NULL;
}

int main()
{
    pthread_t ta, tb;
    assert(pthread_create(&ta, NULL, a, NULL) == 0);
    assert(pthread_create(&tb, NULL, b, NULL) == 0);
    assert(pthread_join(ta, NULL) == 0);
    assert(pthread_join(tb, NULL) == 0);
    return 0;
}

Compiling the code above with
gcc -pthread main.c
and running it with
./a.out
should print "aaaabbbbbaaaabbbbb". Since the program will never terminate, it can be interrupted with
Ctrl+C
which is going to send
SIGINT
to the program which, by default, will make it terminate.
>>

File: train semaphore.jpg ( 57.63 KB , 622x900 , 1655555395291.jpg )

Image
What is a semaphore?
In programming, a semaphore is a simple integer with two oprations:
++
(increment, singal, post,
V(s)V(s)
) and
--
(decrement, wait,
P(s)P(s)
). Unlike a simple integer, however, it has a special proprety around the value zero:

  • (--) If you are a thread (or a task, or a process) and you attempt to decrement a semaphore that's already zero, the semaphore value stays zero, but you get blocked. The only way you can get unblocked is if somebody unblocks you by attempting to increment the semaphore. Otherwise - if the semaphore value is greater than zero - its value is decremented by one and you continue your execution.
  • (++) If you are a thread (or a task, or a process) and you attempt to increment a semaphore, then its value will be incremented by one ONLY IF the value of the semaphore is greater than zero OR if it's zero and there are no blocked threads on the semaphore. If the semaphore value is zero and there blocked threads (i.e. they attempted to decrement it bellow zero) then the semaphore value stays zero, but one thread is unblocked. In any case, the execution of those threads that call an increment

It's not possible for semaphore value to go bellow zero, since any attempt to decrement it bellow zero will get you blocked. Similarly, it's not possible to have threads blocked on a semaphore of value greater than zero, since those threads would have to be unblocked first before incrementing it above zero.

A pseudo-C++ code of how a semaphore works is:
class Sem {
    volatile int m_lock;
    int m_value;
    queue<thread> m_queue;
public:
    Sem(int value) : m_value (value), m_lock(0) {}
    
    void wait() {
        lock(&m_lock);
        if (m_value > 0) {
            m_value -= 1;
        } else if (m_value == 0) {
            m_queue.push(thread_running());
            block(thread_running());
        }
        unlock(&m_lock);
    }
    
    void post() {
        lock(&m_lock);
        if (m_value > 0) {
            m_value += 1;
        } else if (m_value == 0) {
            if (m_queue.size() == 0) {
                m_value += 1;
            } else {
                Thread thread = m_queue.pop();
                unblock(thread);
            }
        }
        unlock(&m_lock);
    }
};

lock()
could be implemented using a busy wait
while (test_and_set(&m_lock) == 1);
while
unlock()
could be implemented as a simple
m_lock = 0;
.

block()
could be also implemented with a busy wait
while (__blocked);
, while
unblock()
could break that loop e.g. with
__blocked = 0;
, but every thread would have to have its own
volatile int __blocked;
variable.

POSIX semaphore
POSIX semaphores can be included with
#include <semaphore.h>
which gives us 4 functions are of importance:
sem_init();     // create a semaphore with initial value
sem_post();     // signal (increment)
sem_wait();     // wait   (decrement)
sem_destroy();  // destroy a semaphore


We'll now see some classical problems solved using POSIX semaphores:
>>

File: syncrhonization.png ( 59.01 KB , 1544x780 , 1655559365310.png )

Image
Thread synchronization
So the code in the OP successfully creates two threads and allows them to run concurrently. However, they are overcompetitive and print as many characters as they can as soon as they get the opportunity. The scheduler grants a certain execution time to every thread but it's not always fair. For example, it can give longer time to thread A compared to thread B, leading to large string of "aaaaaaaaaaaa...".

How would we make the two threads agree to print e.g. "abababababab"? The basic idea is as following: Allow thread A to signal thread B and block itself, and allow thread B to signal thread A and block itself. Thus, we'll A and B will alternate between blocking and signaling each other.

Using semaphores, we'll have semaphore
sa = 1
and semaphore
sb = 0
(two semaphores in total). Thread B will immediatelly attempt to decrement the semaphore
sb
thus block itself because the value of the semaphore zero. Thread A will successfully decrement
sa
from 1 to 0, but it will block itself in the next iteration - but not before it unblocks Thread B on semaphore
sb
. Thread B will then unblock A, and the process repeats.


POSIX semaphore initialization:
sem_t sa, sb;
int main() {
    sem_init(&sa, 0, 1);
    sem_init(&sb, 0, 0);
    ...
    sem_destroy(&sa);
    sem_destroy(&sb);
    return 0;
}

Thread A:
void* a(void* data) {
    for ( ; ; ) {
        sem_wait(&sa);
        printf("a");
        sem_post(&sb);
    }
    return NULL;
}

Thread B:
void* b(void* data) {
    for ( ; ; ) {
        sem_wait(&sb);
        printf("b")
        sem_signal(&sa);
    }
    return NULL;
}

We can also protect outselves against errors using
assert()
and also add some colors through ANSI escape codes. ;)

The full code:
// main.c
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <assert.h>

#define RED "\e[31m"
#define GREEN "\e[32m"
#define NOCOLOR "\e[0m"

sem_t sa, sb;
void* a(void* data)
{
    for ( ; ; ) {
        assert(sem_wait(&sa) == 0);
        printf(RED "a" NOCOLOR);
        assert(sem_post(&sb) == 0);
    }
    return NULL;
}

void* b(void* data)
{
    for ( ; ; ) {
        assert(sem_wait(&sb) == 0);
        printf(GREEN "b" NOCOLOR);
        assert(sem_post(&sa) == 0);
    }
    return NULL;
}

int main()
{
    assert(sem_init(&sa, 0, 0) == 0);
    assert(sem_init(&sb, 0, 1) == 0);
    pthread_t ta, tb;
    assert(pthread_create(&ta, NULL, a, NULL) == 0);
    assert(pthread_create(&tb, NULL, b, NULL) == 0);
    assert(pthread_join(ta, NULL) == 0);
    assert(pthread_join(tb, NULL) == 0);
    assert(sem_destroy(&sa) == 0);
    assert(sem_destroy(&sb) == 0);
    return 0;
}


Compile with
gcc -pthread main.c
and run with
./a.out
. Cancel with
Ctrl + C
>>

File: mutex.png ( 91.2 KB , 1588x916 , 1655563647988.png )

Image
Mutual exclusion problem

Assume two threads want to copy a bulk of data into the same buffer. For example, thread A does
memset(buffer, 'a', length)
while thread B does
memset(buffer, 'b', length)
. We'll assume length is arbitrary but smaller than the buffer length (
int length = (rand() % sizeof(buffer));
).

Because copying something is rather slow, multiple threads can concurrently write into buffer and overwrite each other's data. If there were also threads reading from the buffer, they would be utterly confused how to interpret the data because it's a jumbled mess that's been overwritten multiple times by different threads and in different places.

Thus, writing to and reading from a shared buffer should be atomic: only one thread should be able to have access to the buffer during its operations with it. Using semaphores, this is done by creating a mutex semaphore initialized to 1,
mutex = 1
. A mutex can be obtained (locked) and released (unlocked) by attempting to decrement or increment the mutex semaphore respectively.

The first thread to "lock" the mutex - by decrementing it from 1 to 0 - will continue on using the buffer, and since the value of the semaphore is now zero every other thread that attempts to obtain the mutex will be blocked. Once the thread releases the mutex, other threads have the chaince to obtain it. If a thread never releases a mutex in its possession, other threads will remain permanently blocked. If a thread locks its own mutex twice, it'll block itself and every thread will be in a permanent deadlock.


Implementation:

Initialization:
sem_t mutex;
int main() {
    sem_init(&mutex, 0, 1);
    ...
    sem_destroy(&mutex);
}

Thread A:
void* a(void* data) {
    for ( ; ; ) {
        sem_wait(&mutex);
        memset(buffer, 'a', sizeof(buffer));
        sem_post(&mutex);
    }
    return NULL;
} 

Thread B:
void* b(void* data) {
    for ( ; ; ) {
        sem_wait(&mutex);
        memset(buffer, 'b', sizeof(buffer));
        sem_post(&mutex);
    }
    return NULL;
} 


Full code:
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <stdlib.h>

#define RED(s) ("\e[7m\e[31m" s "\e[0m")
#define GREEN(s) ("\e[32m" s "\e[0m")
#define SIZE 1024

char buffer[SIZE * BUFSIZ];

sem_t mutex;
void* a(void* data)
{
    for ( ; ; ) {
        assert(sem_wait(&mutex) == 0);
        printf(RED("(---"));
        memset(buffer, 'a', (rand() % sizeof(buffer)));
        printf(RED("---)"));
        assert(sem_post(&mutex) == 0);

        usleep(1);
    }
    return NULL;
}

void* b(void* data)
{
    for ( ; ; ) {
        assert(sem_wait(&mutex) == 0);
        printf(GREEN("(---"));
        memset(buffer, 'b', (rand() % sizeof(buffer)));
        printf(GREEN("---)"));
        assert(sem_post(&mutex) == 0);

        usleep(1);
    }
    return NULL;
}

int main()
{
    assert(sem_init(&mutex, 0, 1) == 0);
    pthread_t ta, tb;
    assert(pthread_create(&ta, NULL, a, NULL) == 0);
    assert(pthread_create(&tb, NULL, b, NULL) == 0);
    assert(pthread_join(ta, NULL) == 0);
    assert(pthread_join(tb, NULL) == 0);
    assert(sem_destroy(&mutex) == 0);
    return 0;
}
>>

File: rounded-buffer.png ( 39.18 KB , 767x513 , 1655574754738.png )

Image
Producers and Consumers
Assume there are
NN
producer threads,
MM
consumer threads, and a shared rounded buffer of size
SS
(pic related). Producers always put data in the shared buffer, while consumers always take data from the shared buffer.

If producers are generate data too quickly, it could cause a buffer overflow. Similarly, if consumers fetch data too quickly, the buffer will empty out and they would start fetching invalid or obsolete data. In addition, the buffer has to be protected with a mutex lock, otherwise producers would be competing for the same slot, and consumers could fetch the same data.

The latter problem is solved by surrounding buffer operations with a mutex lock/unlock. This makes the buffer MT-Safe. The former problem is be solved in similar way A-B synchronization was solved >>3 ("abababababababab..."). Producers and consumers will block themselves if the buffer is full or empty respectively, but they will also signal each other each time they put in or take out data from the buffer

Implementation

POSIX Semaphores used:
sem_t mutex, full, empty;
int main() {
    sem_init(&mutex, 0, 1);
    sem_init(&full,  0, BUFFER_SIZE);
    sem_init(&empty, 0, 0);
    ...
    sem_destroy(&mutex);
    sem_destroy(&full);
    sem_destroy(&empty);
}


Producer threads:
void* producer(void* data)
{
    for ( ; ; ) {
        auto data = PRODUCE_DATA();
        sem_wait(&full);
        sem_wait(&mutex);
        BUFFER_PUT(buffer, data);
        assert(sem_post(&mutex) == 0);
        assert(sem_post(&empty) == 0);
    }
    return NULL;
}


Consumer threads:
void* consumer(void* data)
{
    for ( ; ; ) {
        sem_wait(&full);
        sem_wait(&mutex);
        auto data = BUFFER_GET(buffer);
        assert(sem_post(&mutex) == 0);
        assert(sem_post(&empty) == 0);
        CONSUME_DATA(data);
    }
    return NULL;
}


Full code:
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <assert.h>
#include <string.h>
#include <stdlib.h>

#define RED(s) ("\e[7m\e[31m" s "\e[0m")
#define GREEN(s) ("\e[32m" s "\e[0m")
#define BUFFER_SIZE 10
#define PRODUCER_COUNT 10
#define CONSUMER_COUNT 10

int buffer[BUFFER_SIZE], head = 0, tail = 0, count = 0;
sem_t mutex, empty, full;
void* producer(void* data)
{
    for ( ; ; ) {
        /* produce data */
        int x = rand() % 1000;

        /* put into buffer (thread-safe) */
        assert(sem_wait(&full) == 0);
        assert(sem_wait(&mutex) == 0);
        buffer[head] = x;
        head = (head + 1) % BUFFER_SIZE;
        count += 1;
        assert(sem_post(&mutex) == 0);
        assert(sem_post(&empty) == 0);
    }
    return NULL;
}

void* consumer(void* data)
{
    for ( ; ; ) {
        /* get data from buffer (thread-safe) */
        assert(sem_wait(&empty) == 0);
        assert(sem_wait(&mutex) == 0);
        int x = buffer[tail];
        tail = (tail + 1) % BUFFER_SIZE;
        count -= 1;
        assert(sem_post(&mutex) == 0);
        assert(sem_post(&full) == 0);

        /* consume data */
        printf("%d ", x);
    }
    return NULL;
}

int main()
{
    assert(sem_init(&mutex, 0, 1) == 0);
    assert(sem_init(&full,  0, BUFFER_SIZE) == 0);
    assert(sem_init(&empty, 0, 0) == 0);

    pthread_t tc[CONSUMER_COUNT], tp[PRODUCER_COUNT];
    int i;
    for (i = 0; i < CONSUMER_COUNT; i++)
        assert(pthread_create(&tc[i], NULL, consumer, NULL) == 0);
    for (i = 0; i < PRODUCER_COUNT; i++)
        assert(pthread_create(&tp[i], NULL, producer, NULL) == 0);
    for (i = 0; i < CONSUMER_COUNT; i++)
        assert(pthread_join(tc[i], NULL) == 0);
    for (i = 0; i < PRODUCER_COUNT; i++)
        assert(pthread_join(tp[i], NULL) == 0);

    assert(sem_destroy(&mutex) == 0);
    assert(sem_destroy(&empty) == 0);
    assert(sem_destroy(&full) == 0);
    return 0;
}

Compile with
gcc -pthread main.c
and run with
./a.out
, interrupt with
CTRL+C
.
>>

File: rwlock.png ( 32.35 KB , 525x338 , 1655656617191.png )

Image
Readers and Writers
Assume there
NN
reader threads and
MM
writer threads and a shared file. Reader threads should be able to concurrently read the file, but only one writer should be able to modify the file. Readers' lock is often called a "shared lock" while writers' lock is often called "exclusive lock".

In Unix, this type of lock is already provided by the OS through
flock()
:
void MT_safe_read(int fd) 
{   
    flock(fd, LOCK_SH);

    /* Read */

    flock(fd, LOCK_UN);
}

void MT_safe_write(int fd) 
{
    flock(fd, LOCK_EX);
    
    /* Write */
    
    flock(fd, LOCK_UN);
}


In addition, POSIX threads (pthreads) also have an implementation of a read/write lock:
void MT_safe_read(pthread_rwlock_t *rw) 
{   
    pthread_rwlock_rdlock(rw);

    /* Read */

    pthread_rwlock_unlock(rw);
}

void MT_safe_write(pthread_rwlock_t *rw) 
{
    pthread_rwlock_wrlock(rw);

    /* Read */

    pthread_rwlock_unlock(rw);
}


Using POSIX semaphores, RW-lock logic will have to implemented manually. The basic idea is: A reader (or writer) will check if they can read (or write) before locking the file with a shared (or exclusive) lock. If they find themselves unable to lock the file, they will put themselves in the "want-to-read" waiting queue (or "want-to-write" waiting queue) . Once any process finishes reading (or writing) they will check if there is anybody in the "want-to-write" waiting queue & writing is allowed, or if there is anybody in the "want-to-read" waiting queue & reading is allowed. If yes, they will be let through. If not, the mutex will be released (checking process needs to be atomic thus a mutex is necessary).

A pseudo-code of this logic would look like:

void rw_lock(...) {
    /* Acquire lock */
    wait(MUTEX)
    
    /* Check condition */
    if (lock) {
        if (isReader) {
            if (! can_read) {
                signal(MUTEX)
                wait(R_QUEUE)
            }
        } else if (isWriter) {
            if (! can_write) {
                signal(MUTEX)
                wait(W_QUEUE)
            }
        }
    }
    
    /* Allow next */
    if (writers_waiting > 0 && can_write) {
        signal(W_QUEUE)
    } else if (readers_waiting > 0 && can_read) {
        signal(R_QUEUE)
    } else {
        signal(MUTEX)
    }
}

can_read
and
can_write
conditions are just
int can_read = (writer_count == 0);
int can_write = (reader_count == 0 && writer_count == 0);

The counters
writer_count
,
reader_count
,
writers_waiting
,
readers_waiting
now just have to managed be properly.

Drawback of this implementation: writer process starvation is a possibilty if there are too many readers processes acquiring shared locks too quickly.

Implementation:

rw.h
#ifndef __RW_H__
#define __RW_H__
#include <semaphore.h>
enum {
    RW_LOCK   = 1,
    RW_UNLOCK = 2,
    RW_READER = 4,
    RW_WRITER = 8,
};

struct rw_lock_t {
    int w_count;
    int r_count;
    int w_waiting;
    int r_waiting;
    sem_t m_mutex;
    sem_t r_queue;
    sem_t w_queue;
};

int rw_init(struct rw_lock_t *rw);
int rw_lock(struct rw_lock_t *rw, int flags);
int rw_destroy(struct rw_lock_t *rw);

#endif


rw.c
#include "rw.h"

int rw_init(struct rw_lock_t *rw)
{
    rw->w_count = 0;
    rw->r_count = 0;
    rw->w_waiting = 0;
    rw->r_waiting  = 0;
    sem_init(&rw->m_mutex, 0, 1);
    sem_init(&rw->r_queue, 0, 0);
    sem_init(&rw->w_queue, 0, 0);
    return 0;
}

int rw_destroy(struct rw_lock_t *rw)
{
    sem_destroy(&rw->r_queue);
    sem_destroy(&rw->w_queue);
    sem_destroy(&rw->m_mutex);
    return 0;
}

int rw_lock(struct rw_lock_t *rw, int flags)
{
    sem_wait(&rw->m_mutex);
    switch ((flags & RW_LOCK) | (flags & RW_UNLOCK)) {
        case RW_LOCK: {
            if (flags & RW_READER) {
                if (rw->w_count > 0) {
                    rw->r_waiting += 1;
                    sem_post(&rw->m_mutex);
                    sem_wait(&rw->r_queue);
                }
                rw->r_count += 1;
            } else if (flags & RW_WRITER) {
                if (rw->w_count > 0 || rw->r_count > 0) {
                    rw->w_waiting += 1;
                    sem_post(&rw->m_mutex);
                    sem_wait(&rw->w_queue);
                }
                rw->w_count += 1;
            }
        } break;
        case RW_UNLOCK: {
            if (flags & RW_READER) {
                rw->r_count -= 1;
            } else if (flags & RW_WRITER) {
                rw->w_count -= 1;
            }
        } break;
    }
    if (rw->w_waiting > 0 && rw->w_count == 0 && rw->r_count == 0) {
        rw->w_waiting -= 1;
        sem_post(&rw->w_queue);
    } else if (rw->r_waiting > 0 && rw->w_count == 0) {
        rw->r_waiting -= 1;
        sem_post(&rw->r_queue);
    } else {
        sem_post(&rw->m_mutex);
    }
    return 0;
}


main.c
#include <stdio.h>
#include <pthread.h>
#include <assert.h>
#include <stdlib.h>
#include <unistd.h>
#include "rw.h"

#define RED(s) ("\e[7m\e[31m" s "\e[0m")
#define GREEN(s) ("\e[32m" s "\e[0m")
#define READER_COUNT 12
#define WRITER_COUNT 5

struct rw_lock_t rw;
void* reader(void* data)
{
    for ( ; ; ) {
        rw_lock(&rw, RW_LOCK | RW_READER);
        /* Read data */
        printf(GREEN(" r "));
        rw_lock(&rw, RW_UNLOCK | RW_READER);
        usleep(4);
    }
    return NULL;
}

void* writer(void* data)
{
    for ( ; ; ) {
        rw_lock(&rw, RW_LOCK | RW_WRITER);
        /* Write data */
        printf(RED(" w "));
        rw_lock(&rw, RW_UNLOCK | RW_WRITER);
        usleep(4);
    }
    return NULL;
}

int main()
{
    rw_init(&rw);
    pthread_t tr[READER_COUNT], tw[WRITER_COUNT];
    int i;
    for (i = 0; i < READER_COUNT; i++)
        assert(pthread_create(&tr[i], NULL, reader, NULL) == 0);
    for (i = 0; i < WRITER_COUNT; i++)
        assert(pthread_create(&tw[i], NULL, writer, NULL) == 0);
    for (i = 0; i < READER_COUNT; i++)
        assert(pthread_join(tr[i], NULL) == 0);
    for (i = 0; i < WRITER_COUNT; i++)
        assert(pthread_join(tw[i], NULL) == 0);
    rw_destroy(&rw);
    return 0;
}


Compile with
gcc -pthread main.c rw.c
, run
./a.out
. Interrupt with
Ctrl + C
.
>>

File: barrier.jpeg ( 97.42 KB , 1200x725 , 1655995461457.jpeg )

Image
Barrier synchronization
Barriers are used accumulate a certain number of threads on the barrier letting them continue.

Assume there are 5 threads downloading video clips from the internet for the purpose of splicing them together. Some threads will finish quicker than others depending on download speed and the size of the clip. In any case, every downloader thread has to wait for other four to finish downloading before splicing can happen.

Barriers are natively implemented by pthreads
static int count = 5;
pthread_barrier_t bar;

void *thread(void *data) {
    for ( ; ; ) {
        ...
        pthread_barrier_wait(&bar);
        ...
    }
}

int main() {
    ...
    pthread_barrier_init(&bar, NULL, count);
    ...
    pthread_barrier_destroy(&bar);
}


Implementation using POSIX semaphores involves simply counting the number of threads blocked on the barrier and flushing all of them after a certain number is reached. The counter variable has to be protected with a mutex of course.

Implementation
bar.h
#ifndef __BARRIER_H__
#define __BARRIER_H__
#include <semaphore.h>
#include <stdlib.h>
struct barrier_t {
    sem_t b_mutex;
    sem_t b_barrier;
    int b_required;
    int b_count;
    void* b_retval;
};

void bar_init(struct barrier_t *bar, int required);
void bar_wait(struct barrier_t *bar);
void bar_destroy(struct barrier_t *bar);

#endif


bar.c
#include "bar.h"

void bar_init(struct barrier_t *bar, int required)
{
    sem_init(&bar->b_mutex, 0, 1);
    sem_init(&bar->b_barrier, 0, 0);
    bar->b_required = required;
    bar->b_count = 0;
    bar->b_retval = NULL;
}

void bar_wait(struct barrier_t *bar)
{
    sem_wait(&bar->b_mutex);
    bar->b_count += 1;
    if (bar->b_count == bar->b_required) {
        bar->b_count = 0;
        int i;
        for (i = 0; i < bar->b_required; i++)
            sem_post(&bar->b_barrier);
    }
    sem_post(&bar->b_mutex);
    sem_wait(&bar->b_barrier);
}

void bar_destroy(struct barrier_t *bar)
{
    sem_destroy(&bar->b_mutex);
    sem_destroy(&bar->b_barrier);
}


main.c
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "bar.h"

#define B 4
#define N 12

#define REV "\e[7m"
#define COLOR(c) "\e[" c "m"
#define NONE "\e[0m"
#define RED  31

struct barrier_t bar[B];

void *thread_func(void *data)
{
    int *id = (int*)(data);
    const char *fmt = REV COLOR("%d") " %d " NONE; 
    int iter = 0, prev = 0;
    for ( ; ; ) {
        prev = iter;
        iter = (iter + 1) % B;
        bar_wait(&bar[iter]); /* try commenting out this line ;) */
        printf(fmt, RED + iter, *id);
        usleep(50);
    }
    return (void*)id;
} 

int main()
{
    pthread_t thread[N];
    int i;
    for (i = 0; i < B; i++)
        bar_init(&bar[i], N);
    for (i = 0; i < N; i++) {
        int *id = calloc(1, sizeof(int));
        *id = i;
        pthread_create(&thread[i], NULL, thread_func, (void*)id);
    }
    for (int i = 0; i < N; i++) {
        int *id = NULL;
        pthread_join(thread[i], (void**)&id);
        free(id);
    }
    for (i = 0; i < B; i++)
        bar_destroy(&bar[i]);
    return 0;
}


Compile using
gcc -pthread main.c bar.c
, run with
./a.out
. Interrupt with
CTRL + C
.
>>
Thank you anonerd.
>>
Wow. Did you write this or is this copied from somewhere?
>>
>>1
Thank you anon. I didn't get the call back function part. Can you explain
void* (*cb)(void*);
and
assert(pthread_create(&ta, NULL, a, NULL) == 0);