[ home ] [ math / cs / ai / phy / as / chem / bio / geo ] [ civ / aero / mech / ee / hdl / os / dev / web / app / sys / net / sec ] [ med / fin / psy / soc / his / lit / lin / phi / arch ] [ off / vg / jp / 2hu / tc / ts / adv / hr / meta / tex ] [ chat ] [ wiki ]

Viewing source code

The following is the source code for post >>>/os/6

\textbf{Readers and Writers}
Assume there  \[N\] reader threads and \[M\] writer threads and a shared file. Reader threads should be able to concurrently read the file, but only one writer should be able to modify the file. Readers' lock is often called a "shared lock" while writers' lock is often called "exclusive lock".

In Unix, this type of lock is already provided by the OS through \`flock()\`:
```

void MT_safe_read(int fd) 
{   
    flock(fd, LOCK_SH);

    /* Read */

    flock(fd, LOCK_UN);
}

void MT_safe_write(int fd) 
{
    flock(fd, LOCK_EX);
    
    /* Write */
    
    flock(fd, LOCK_UN);
}
```

In addition, POSIX threads (pthreads) also have an implementation of a read/write lock:
```
void MT_safe_read(pthread_rwlock_t *rw) 
{   
    pthread_rwlock_rdlock(rw);

    /* Read */

    pthread_rwlock_unlock(rw);
}

void MT_safe_write(pthread_rwlock_t *rw) 
{
    pthread_rwlock_wrlock(rw);

    /* Read */

    pthread_rwlock_unlock(rw);
}
```

Using POSIX semaphores, RW-lock logic will have to implemented manually. The basic idea is: A reader (or writer) will check if they can read (or write) before locking the file with a shared (or exclusive) lock. If they find themselves unable to lock the file, they will put themselves in the "want-to-read" waiting queue (or "want-to-write" waiting queue) . Once any process finishes reading (or writing) they will check if there is anybody in the "want-to-write" waiting queue & writing is allowed, or if there is anybody in the "want-to-read" waiting queue & reading is allowed. If yes, they will be let through. If not, the mutex will be released (checking process needs to be atomic thus a mutex is necessary).

A pseudo-code of this logic would look like:

```
void rw_lock(...) {
    /* Acquire lock */
    wait(MUTEX)
    
    /* Check condition */
    if (lock) {
        if (isReader) {
            if (! can_read) {
                signal(MUTEX)
                wait(R_QUEUE)
            }
        } else if (isWriter) {
            if (! can_write) {
                signal(MUTEX)
                wait(W_QUEUE)
            }
        }
    }
    
    /* Allow next */
    if (writers_waiting > 0 && can_write) {
        signal(W_QUEUE)
    } else if (readers_waiting > 0 && can_read) {
        signal(R_QUEUE)
    } else {
        signal(MUTEX)
    }
}
```
\`can_read\` and \`can_write\` conditions are just
```
int can_read = (writer_count == 0);
int can_write = (reader_count == 0 && writer_count == 0);
```
The counters \`writer_count\`, \`reader_count\`, \`writers_waiting\`, \`readers_waiting\` now just have to managed be properly.

Drawback of this implementation: writer process starvation is a possibilty if there are too many readers processes acquiring shared locks too quickly. 

\textbf{Implementation}: 

rw.h
```
#ifndef __RW_H__
#define __RW_H__
#include <semaphore.h>
enum {
    RW_LOCK   = 1,
    RW_UNLOCK = 2,
    RW_READER = 4,
    RW_WRITER = 8,
};

struct rw_lock_t {
    int w_count;
    int r_count;
    int w_waiting;
    int r_waiting;
    sem_t m_mutex;
    sem_t r_queue;
    sem_t w_queue;
};

int rw_init(struct rw_lock_t *rw);
int rw_lock(struct rw_lock_t *rw, int flags);
int rw_destroy(struct rw_lock_t *rw);

#endif
```

rw.c
```
#include "rw.h"

int rw_init(struct rw_lock_t *rw)
{
    rw->w_count = 0;
    rw->r_count = 0;
    rw->w_waiting = 0;
    rw->r_waiting  = 0;
    sem_init(&rw->m_mutex, 0, 1);
    sem_init(&rw->r_queue, 0, 0);
    sem_init(&rw->w_queue, 0, 0);
    return 0;
}

int rw_destroy(struct rw_lock_t *rw)
{
    sem_destroy(&rw->r_queue);
    sem_destroy(&rw->w_queue);
    sem_destroy(&rw->m_mutex);
    return 0;
}

int rw_lock(struct rw_lock_t *rw, int flags)
{
    sem_wait(&rw->m_mutex);
    switch ((flags & RW_LOCK) | (flags & RW_UNLOCK)) {
        case RW_LOCK: {
            if (flags & RW_READER) {
                if (rw->w_count > 0) {
                    rw->r_waiting += 1;
                    sem_post(&rw->m_mutex);
                    sem_wait(&rw->r_queue);
                }
                rw->r_count += 1;
            } else if (flags & RW_WRITER) {
                if (rw->w_count > 0 || rw->r_count > 0) {
                    rw->w_waiting += 1;
                    sem_post(&rw->m_mutex);
                    sem_wait(&rw->w_queue);
                }
                rw->w_count += 1;
            }
        } break;
        case RW_UNLOCK: {
            if (flags & RW_READER) {
                rw->r_count -= 1;
            } else if (flags & RW_WRITER) {
                rw->w_count -= 1;
            }
        } break;
    }
    if (rw->w_waiting > 0 && rw->w_count == 0 && rw->r_count == 0) {
        rw->w_waiting -= 1;
        sem_post(&rw->w_queue);
    } else if (rw->r_waiting > 0 && rw->w_count == 0) {
        rw->r_waiting -= 1;
        sem_post(&rw->r_queue);
    } else {
        sem_post(&rw->m_mutex);
    }
    return 0;
}

```

main.c
```
#include <stdio.h>
#include <pthread.h>
#include <assert.h>
#include <stdlib.h>
#include <unistd.h>
#include "rw.h"

#define RED(s) ("\e[7m\e[31m" s "\e[0m")
#define GREEN(s) ("\e[32m" s "\e[0m")
#define READER_COUNT 12
#define WRITER_COUNT 5

struct rw_lock_t rw;
void* reader(void* data)
{
    for ( ; ; ) {
        rw_lock(&rw, RW_LOCK | RW_READER);
        /* Read data */
        printf(GREEN(" r "));
        rw_lock(&rw, RW_UNLOCK | RW_READER);
        usleep(4);
    }
    return NULL;
}

void* writer(void* data)
{
    for ( ; ; ) {
        rw_lock(&rw, RW_LOCK | RW_WRITER);
        /* Write data */
        printf(RED(" w "));
        rw_lock(&rw, RW_UNLOCK | RW_WRITER);
        usleep(4);
    }
    return NULL;
}

int main()
{
    rw_init(&rw);
    pthread_t tr[READER_COUNT], tw[WRITER_COUNT];
    int i;
    for (i = 0; i < READER_COUNT; i++)
        assert(pthread_create(&tr[i], NULL, reader, NULL) == 0);
    for (i = 0; i < WRITER_COUNT; i++)
        assert(pthread_create(&tw[i], NULL, writer, NULL) == 0);
    for (i = 0; i < READER_COUNT; i++)
        assert(pthread_join(tr[i], NULL) == 0);
    for (i = 0; i < WRITER_COUNT; i++)
        assert(pthread_join(tw[i], NULL) == 0);
    rw_destroy(&rw);
    return 0;
}```

Compile with \`gcc -pthread main.c rw.c\`, run \`./a.out\`. Interrupt with \`Ctrl + C\`.