\textbf{Readers and Writers} Assume there \[N\] reader threads and \[M\] writer threads and a shared file. Reader threads should be able to concurrently read the file, but only one writer should be able to modify the file. Readers' lock is often called a "shared lock" while writers' lock is often called "exclusive lock". In Unix, this type of lock is already provided by the OS through \`flock()\`: ``` void MT_safe_read(int fd) { flock(fd, LOCK_SH); /* Read */ flock(fd, LOCK_UN); } void MT_safe_write(int fd) { flock(fd, LOCK_EX); /* Write */ flock(fd, LOCK_UN); } ``` In addition, POSIX threads (pthreads) also have an implementation of a read/write lock: ``` void MT_safe_read(pthread_rwlock_t *rw) { pthread_rwlock_rdlock(rw); /* Read */ pthread_rwlock_unlock(rw); } void MT_safe_write(pthread_rwlock_t *rw) { pthread_rwlock_wrlock(rw); /* Read */ pthread_rwlock_unlock(rw); } ``` Using POSIX semaphores, RW-lock logic will have to implemented manually. The basic idea is: A reader (or writer) will check if they can read (or write) before locking the file with a shared (or exclusive) lock. If they find themselves unable to lock the file, they will put themselves in the "want-to-read" waiting queue (or "want-to-write" waiting queue) . Once any process finishes reading (or writing) they will check if there is anybody in the "want-to-write" waiting queue & writing is allowed, or if there is anybody in the "want-to-read" waiting queue & reading is allowed. If yes, they will be let through. If not, the mutex will be released (checking process needs to be atomic thus a mutex is necessary). A pseudo-code of this logic would look like: ``` void rw_lock(...) { /* Acquire lock */ wait(MUTEX) /* Check condition */ if (lock) { if (isReader) { if (! can_read) { signal(MUTEX) wait(R_QUEUE) } } else if (isWriter) { if (! can_write) { signal(MUTEX) wait(W_QUEUE) } } } /* Allow next */ if (writers_waiting > 0 && can_write) { signal(W_QUEUE) } else if (readers_waiting > 0 && can_read) { signal(R_QUEUE) } else { signal(MUTEX) } } ``` \`can_read\` and \`can_write\` conditions are just ``` int can_read = (writer_count == 0); int can_write = (reader_count == 0 && writer_count == 0); ``` The counters \`writer_count\`, \`reader_count\`, \`writers_waiting\`, \`readers_waiting\` now just have to managed be properly. Drawback of this implementation: writer process starvation is a possibilty if there are too many readers processes acquiring shared locks too quickly. \textbf{Implementation}: rw.h ``` #ifndef __RW_H__ #define __RW_H__ #include <semaphore.h> enum { RW_LOCK = 1, RW_UNLOCK = 2, RW_READER = 4, RW_WRITER = 8, }; struct rw_lock_t { int w_count; int r_count; int w_waiting; int r_waiting; sem_t m_mutex; sem_t r_queue; sem_t w_queue; }; int rw_init(struct rw_lock_t *rw); int rw_lock(struct rw_lock_t *rw, int flags); int rw_destroy(struct rw_lock_t *rw); #endif ``` rw.c ``` #include "rw.h" int rw_init(struct rw_lock_t *rw) { rw->w_count = 0; rw->r_count = 0; rw->w_waiting = 0; rw->r_waiting = 0; sem_init(&rw->m_mutex, 0, 1); sem_init(&rw->r_queue, 0, 0); sem_init(&rw->w_queue, 0, 0); return 0; } int rw_destroy(struct rw_lock_t *rw) { sem_destroy(&rw->r_queue); sem_destroy(&rw->w_queue); sem_destroy(&rw->m_mutex); return 0; } int rw_lock(struct rw_lock_t *rw, int flags) { sem_wait(&rw->m_mutex); switch ((flags & RW_LOCK) | (flags & RW_UNLOCK)) { case RW_LOCK: { if (flags & RW_READER) { if (rw->w_count > 0) { rw->r_waiting += 1; sem_post(&rw->m_mutex); sem_wait(&rw->r_queue); } rw->r_count += 1; } else if (flags & RW_WRITER) { if (rw->w_count > 0 || rw->r_count > 0) { rw->w_waiting += 1; sem_post(&rw->m_mutex); sem_wait(&rw->w_queue); } rw->w_count += 1; } } break; case RW_UNLOCK: { if (flags & RW_READER) { rw->r_count -= 1; } else if (flags & RW_WRITER) { rw->w_count -= 1; } } break; } if (rw->w_waiting > 0 && rw->w_count == 0 && rw->r_count == 0) { rw->w_waiting -= 1; sem_post(&rw->w_queue); } else if (rw->r_waiting > 0 && rw->w_count == 0) { rw->r_waiting -= 1; sem_post(&rw->r_queue); } else { sem_post(&rw->m_mutex); } return 0; } ``` main.c ``` #include <stdio.h> #include <pthread.h> #include <assert.h> #include <stdlib.h> #include <unistd.h> #include "rw.h" #define RED(s) ("\e[7m\e[31m" s "\e[0m") #define GREEN(s) ("\e[32m" s "\e[0m") #define READER_COUNT 12 #define WRITER_COUNT 5 struct rw_lock_t rw; void* reader(void* data) { for ( ; ; ) { rw_lock(&rw, RW_LOCK | RW_READER); /* Read data */ printf(GREEN(" r ")); rw_lock(&rw, RW_UNLOCK | RW_READER); usleep(4); } return NULL; } void* writer(void* data) { for ( ; ; ) { rw_lock(&rw, RW_LOCK | RW_WRITER); /* Write data */ printf(RED(" w ")); rw_lock(&rw, RW_UNLOCK | RW_WRITER); usleep(4); } return NULL; } int main() { rw_init(&rw); pthread_t tr[READER_COUNT], tw[WRITER_COUNT]; int i; for (i = 0; i < READER_COUNT; i++) assert(pthread_create(&tr[i], NULL, reader, NULL) == 0); for (i = 0; i < WRITER_COUNT; i++) assert(pthread_create(&tw[i], NULL, writer, NULL) == 0); for (i = 0; i < READER_COUNT; i++) assert(pthread_join(tr[i], NULL) == 0); for (i = 0; i < WRITER_COUNT; i++) assert(pthread_join(tw[i], NULL) == 0); rw_destroy(&rw); return 0; }``` Compile with \`gcc -pthread main.c rw.c\`, run \`./a.out\`. Interrupt with \`Ctrl + C\`.