Tuesday, May 5, 2009

*nix: crash-safe rw-lock with semaphores

In previous post about rw-locks based on semaphores I've introduced rw-lock mechanism to limit readers.
This post introduces crash-safe implementation of rw-locks. The implemlementation doesn't take into account bad application design. It doesn't detect deadlocks however this is not hard to implement when you get the main idea.
The implementation tracks readers and writers and can detect whether reader/writer that currently blocks the queue is alive and tries to improve the whole situation.
The threads are not taken into account because if thread crashes due to some unpredicted reason most likely the process that created it will die too. If thread exits without releasing the lock this is not a crash - it's poor application design. Safiness of such behaviors is another topic I believe.

In the real world you may get some situation when you want some safiness with locking the section and the code of this section might fail because of the outer world influence(unhandled signal, OOM-killer) and interior ifluence(the code of section might cause segfault).

To behonest most of the time I don't believe the code I'm protecting and that the outer world is kind for me. The code might cause crash deeply inside the function calls and the OOM killer might chose the application as victim.
Knowing all these I want the whole application continue to run.
The implemetation of crash-safe rw-locks is listed below. Well, it's a bit linux-specific(in the mmap call) but this is just for simplicity. I didn't want to overload the code to make it cross-platform.

#include <semaphore.h>
#include <sys/mman.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>
#include <string.h>

struct rw_lock *rw;

struct rw_lock {
 sem_t wbl;
 sem_t rbl;
 sem_t r;
 sem_t w;
 int limit;
 pid_t writer;
 pid_t *readers;
};

void init_rwlock(struct rw_lock **rw, int limit)
{
 *rw = mmap(NULL, sizeof(struct rw_lock), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, 0, 0);

 sem_init(&(*rw)->r, 1, limit);
 sem_init(&(*rw)->w, 1, 1);
 sem_init(&(*rw)->wbl, 1, 1);
 sem_init(&(*rw)->rbl, 1, 1);
 (*rw)->limit = limit;
 (*rw)->writer = 0;
 (*rw)->readers = mmap(NULL, sizeof(pid_t)*limit, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, 0, 0);
}

void rlock(struct rw_lock *rw)
{
 int i;
 struct timespec to = {
  .tv_sec = 1,
  .tv_nsec = 0
 };

 sem_wait(&rw->w);

 do {
  if (sem_timedwait(&rw->r, &to) == 0) {
   break;
  } else if (errno == ETIMEDOUT) {
   sem_wait(&rw->rbl);
   for (i=0;i<rw->limit;++i)
    if (rw->readers[i] && kill(rw->readers[i], 0) == -1 && errno == ESRCH) {
     printf("deadlock detected: process invoked rlock died(%d)\n", rw->readers[i]), fflush(NULL);
     rw->readers[i] = 0;
     sem_post(&rw->r);

     break;
    }
   sem_post(&rw->rbl);
  }
 } while (1);

 sem_wait(&rw->rbl);
 for (i=0;i<rw->limit;++i)
  if (rw->readers[i] == 0) {
   rw->readers[i] = getpid();

   break;
  }
 sem_post(&rw->rbl);

 sem_post(&rw->w);
}

void runlock(struct rw_lock *rw)
{
 int i, current = getpid();

 sem_wait(&rw->rbl);
 for (i=0;i<rw->limit;++i)
  if (rw->readers[i] == current) {
   rw->readers[i] = 0;

   break;
  }
 sem_post(&rw->rbl);

 sem_post(&rw->r);
}

void wlock(struct rw_lock *rw)
{
 int val;
 pid_t current = getpid();
 struct timespec to = {
  .tv_sec = 1,
  .tv_nsec = 0
 };
 time_t wfr0, wfr1;

 do {
  if (sem_timedwait(&rw->w, &to) == 0) {
   break;
  } else if (errno == ETIMEDOUT) {
   sem_wait(&rw->wbl);
   if (rw->writer && kill(rw->writer, 0) == -1 && errno == ESRCH) {
    printf("deadlock detected: process invoked wlock died(%d)\n", rw->writer), fflush(NULL);
    rw->writer = 0;
    sem_post(&rw->w);
   }
   sem_post(&rw->wbl);
  }
 } while (1);
 sem_wait(&rw->wbl);
 rw->writer = current;
 sem_post(&rw->wbl);

 wfr0 = time(NULL);
 do {
  wfr1 = time(NULL);
  if ((wfr1 - wfr0) > 1) {
   int i;
   sem_wait(&rw->rbl);
   for (i=0;rw->limit;++i)
    if (rw->readers[i] && kill(rw->readers[i], 0) == -1 && errno == ESRCH) {
     printf("deadlock detected: process invoked rlock died(%d)\n", rw->readers[i]), fflush(NULL);
     rw->readers[i] = 0;
     sem_post(&rw->r);

     break;
    }
   sem_post(&rw->rbl);
   wfr0 = wfr1;
  }

  sem_getvalue(&rw->r, &val);

 } while (val != rw->limit);
}

void wunlock(struct rw_lock *rw)
{
 sem_wait(&rw->wbl);
 rw->writer = 0;
 sem_post(&rw->wbl);

 sem_post(&rw->w);
}
The idea is simple. If the process unable to lock the section for some time it checks whether the previos holder is alive. Otherwise the process tries to release the semaphore and accuire it again. It's possible to omit release/accuire procedure and just change the holder. You'll gain some performance with this change. I left this just to make the code more clear.

The application that simulates crashes is shown below:
int *counter;

void reader(void)
{
 while (1) {
  rlock(rw);
  if (*counter == 1024*4) {
   runlock(rw);
   break;
  }
  if (*counter !=0 && *counter%1024 == 0) {
   printf("reader died(counter: %d, pid: %d)\n", *counter,getpid()), fflush(NULL);
   *(int *)0 = 0;
  }
  runlock(rw);
 }
}

void writer(void)
{
 while (1) {
  wlock(rw);
  if (*counter == 2048*2) {
   wunlock(rw);
   break;
  }
  ++*counter;
  if (*counter !=0 && *counter%2048 == 0) {
   printf("writer died(counter: %d, pid: %d)\n", *counter, getpid()), fflush(NULL);
   *(int *)0 = 0;
  }
  wunlock(rw);
 }
}
int main(int argc, char **argv)
{
 int i;

 counter = mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, 0, 0);

 init_rwlock(&rw, 5);

 for (i=0;i<10;++i)
  if (fork() == 0) {
   reader();

   return 0;
  }

 for (i=0;i<5;++i)
  if (fork() == 0) {
   writer();

   return 0;
  }

 for (i=0;i<15;++i)
  wait(NULL);

 printf("counter: %d\n", *counter);

 return 0;
}
After running this application you'll find the information about the crashed processes in dmesg:
*sudo dmesg -c
test[8806]: segfault at 0 ip 08048d5e sp bf828290 error 6 in test[8048000+2000]
test[8812]: segfault at 0 ip 08048e01 sp bf828290 error 6 in test[8048000+2000]
test[8801]: segfault at 0 ip 08048d5e sp bf828290 error 6 in test[8048000+2000]
test[8804]: segfault at 0 ip 08048d5e sp bf828290 error 6 in test[8048000+2000]
test[8808]: segfault at 0 ip 08048d5e sp bf828290 error 6 in test[8048000+2000]
test[8799]: segfault at 0 ip 08048d5e sp bf828290 error 6 in test[8048000+2000]
test[8800]: segfault at 0 ip 08048d5e sp bf828290 error 6 in test[8048000+2000]
test[8802]: segfault at 0 ip 08048d5e sp bf828290 error 6 in test[8048000+2000]
test[8805]: segfault at 0 ip 08048d5e sp bf828290 error 6 in test[8048000+2000]
test[8809]: segfault at 0 ip 08048e01 sp bf828290 error 6 in test[8048000+2000]
And the application should produce following output:
reader died(counter: 1024, pid: 8806)
deadlock detected: process invoked rlock died(8806)
writer died(counter: 2048, pid: 8812)
deadlock detected: process invoked wlock died(8812)
reader died(counter: 2048, pid: 8801)
reader died(counter: 2048, pid: 8804)
reader died(counter: 2048, pid: 8808)
reader died(counter: 2048, pid: 8799)
reader died(counter: 2048, pid: 8802)
deadlock detected: process invoked rlock died(8801)
reader died(counter: 2048, pid: 8800)
deadlock detected: process invoked rlock died(8800)
reader died(counter: 2048, pid: 8805)
deadlock detected: process invoked rlock died(8805)
deadlock detected: process invoked rlock died(8804)
deadlock detected: process invoked rlock died(8808)
deadlock detected: process invoked rlock died(8799)
deadlock detected: process invoked rlock died(8802)
writer died(counter: 4096, pid: 8809)
deadlock detected: process invoked wlock died(8809)
counter: 4096, max_readers: 5
As you can see there are few readers crashed in the same circumstances just because they were allowed to do so.