Thursday, March 18, 2010

event handling with message queue on top of pipes: part 1

Event-driven program flow is common for interactive systems. Events usually come into the system in non-deterministic way. Also it's not always known how much time handling event would take. Moreover events for processing could appear while current one is being processed. Thereby it's better to handle events in parallel to aviod pilling up of pending events.
Some message interchange mechanism should be used to transmit events between the main dispatcher and handlers.
Exist different implementation of message queues but here I'd like to invent the wheel and show that there's still place for imagination.

In the message queue implementation pipe is used as a transmission path for messages.
Pipe is a generic mechanism to exchange data between the processes. It is a unidirectional channel, a bridge, between two instances of the process. One process is able to write data into the write end of the pipe while another one is able to read the written data from the read end. In most cases there is a writer on one side and a reader on the other.
The concept of [one reader]/[one writer] could be extended to [multiply readers]/[multiply writers] but this will complicate things. If keep things simple it would be necessary to create two separate pipes for message queue for one handler: one for sending requests from dispatcher's side and one for sending responses from handler's side.

Unlike FIFO(a named pipe visible in userspace within the filesystem as a special file), pipe created by a process is visible only by children of the process(and by the process itself of course). This effect is a consequence of how pipe is being represented: a file descriptor for each end.
Pipe could be created by calling pipe routine from the C standard library:

int pipe(int pipefd[2]);
Two file descriptors ensue from calling pipe: one is referring to the read end of the pipe, the other one is referring to the write end. In the most cases this call directly invokes a system call which has the same name.
From linux kernel sources it's obvious that the control is immediately passed from sys_pipe to sys_pipe2 system call:
SYSCALL_DEFINE2(pipe2, int __user *, fildes, int, flags)
{
 int fd[2];
 int error;

 error = do_pipe_flags(fd, flags);
...
}
SYSCALL_DEFINE1(pipe, int __user *, fildes)
{
 return sys_pipe2(fildes, 0);
}
Currently the flags are out of our interest that is why sys_pipe is used which uses zero for flags argument in sys_pipe2.
For us the most interesting part is hidden in do_pipe_flags routine. The part we are interested in is highlighted below:
int do_pipe_flags(int *fd, int flags)
{
 struct file *fw, *fr;
...
 fw = create_write_pipe(flags);
...
 fr = create_read_pipe(fw, flags);
...
}
Before going further I would like to make a small remark on how pipes are managed inside the linux kernel. The kernel manages pipes as files on pseudo filesystem pipefs. This filesystem could not be mounted from the userspace because it has MS_NOUSER flag but it is visible from kernel under pipe: 'mountpoint':
static struct vfsmount *pipe_mnt;
...
static int pipefs_get_sb(struct file_system_type *fs_type,
    int flags, const char *dev_name, void *data,
    struct vfsmount *mnt)
{
 return get_sb_pseudo(fs_type, "pipe:", NULL, PIPEFS_MAGIC, mnt);
}

static struct file_system_type pipe_fs_type = {
 .name  = "pipefs",
 .get_sb  = pipefs_get_sb,
 .kill_sb = kill_anon_super,
};

static int __init init_pipe_fs(void)
{
 int err = register_filesystem(&pipe_fs_type);

 if (!err) {
  pipe_mnt = kern_mount(&pipe_fs_type);
  ...
}
In do_pipe_flags routine create_write_pipe allocates inode on pipefs with the help of get_pipe_inode which in order calls alloc_pipe_info:
static struct inode * get_pipe_inode(void)
{
 struct inode *inode = new_inode(pipe_mnt->mnt_sb);
 struct pipe_inode_info *pipe;
...
 pipe = alloc_pipe_info(inode);
...
}
Besides initializing pipe-specific data alloc_pipe_info creates a waitqueue which is someway of interest for message queue(as would be shown below):
struct pipe_inode_info * alloc_pipe_info(struct inode *inode)
{
...
  init_waitqueue_head(&pipe->wait);
...
 return pipe;
}
Returning back to create_write_pipe the kernel creates new file with write-only flags and write_pipefifo_fops file operations:
const struct file_operations write_pipefifo_fops = {
 .llseek  = no_llseek,
 .read  = bad_pipe_r,
 .write  = do_sync_write,
 .aio_write = pipe_write,
...
 .open  = pipe_write_open,
...
};
...
 f = alloc_file(pipe_mnt, dentry, FMODE_WRITE, &write_pipefifo_fops);
...
 f->f_flags = O_WRONLY | (flags & O_NONBLOCK);
This is write end of the pipe.
create_read_pipe used for creation of read end of the pipe reuses the job done by write_pipefifo_fops:
struct file *create_read_pipe(struct file *wrf, int flags)
{
 struct file *f = get_empty_filp();
...
 /* Grab pipe from the writer */
 f->f_path = wrf->f_path;
 path_get(&wrf->f_path);
 f->f_mapping = wrf->f_path.dentry->d_inode->i_mapping;

 f->f_pos = 0;
 f->f_flags = O_RDONLY | (flags & O_NONBLOCK);
 f->f_op = &read_pipefifo_fops;
...
}
inode mapping is unsurprisingly used from the write end but file operations differ:
const struct file_operations read_pipefifo_fops = {
 .llseek  = no_llseek,
 .read  = do_sync_read,
 .aio_read = pipe_read,
 .write  = bad_pipe_w,
...
 .open  = pipe_read_open,
...
};
Then with the help of get_unused_fd_flags file descriptors that would be returned into the userspace upon successful completion of sys_pipe system call are being created.

There's no more need to stay in kernel mode for the moment. Let's return to the userspace implementation of message queue.

The basic structures used to form message requests and responses are divided into header and payload parts:
struct mq_data {
    unsigned long len;
    char *data;
} __attribute__((packed));

struct mq_request {
    unsigned char event; // event type
    unsigned long long id; // request id
    unsigned long time; // issue time
    struct mq_data data; // payload
} __attribute__((packed));

struct mq_response {
    unsigned char event; // event type
    unsigned long long req_id; // request id to which the response belongs
    unsigned long long rsp_id; // response id
    unsigned long time; // issue time
    struct mq_data data; // payload
} __attribute__((packed));
struct mq_data represents message payload and generic for request and response. The comments descriptive enough. Worth to mention which event type are predefined in current implementation:
enum mq_event {
    MQ_EVENT_REQUEST            = (1 << 0), // generic request to the client
    MQ_EVENT_REQUEST_NO_ACK     = (1 << 1), // request that doesn't require ACK from the client
    MQ_EVENT_RESPONSE           = (1 << 2), // response from the client
    MQ_EVENT_ACK                = (1 << 3), // ACK to the request that it was processed
    MQ_EVENT_EXIT               = (1 << 4), // call to terminate the client
};
The principle of operation is pretty simple. On initialization stage the core process launches subprocesses that will handle messages, additionally it launches a thread that will receive responses from the handlers:
#define HANDLERS 3
struct mq mq;
struct mq_handler handlers[HANDLERS];
int main(int argc, char **argv)
{
    int i;
....
    mq_init(&mq);
....
    for (i=0;i<HANDLERS;++i)
        if (mq_launch_handler(&mq, &handlers[i], handler) != 0) {
            fprintf(stderr, "Failed to launch handler\n");
            goto out;
        }
....
The structure struct mq drawn above defines core message queue. The definition is as follows:
struct mq {
    struct mq_list handlers;  // queue handlers
    struct mq_lock lock;      // management lock

    short state;              // state of the queue

    pthread_t rsp_manager;        // thread handle for managing responses
    struct mq_list rsp_list;  // list of responses
    pthread_mutex_t rsp_lock; // lock for the list of responses
};
struct mq_list is a generic doubly-linked list implementation. I will not bother to discuss it here. Locking primitive struct mq_lock is built upon POSIX mutex in conjunction with simple reference counting.
The main job of the response handling thread mentioned as above is to fetch packets from pipe, do some preprocessing mostly needed for finer scheduling and put into local queue for later usage.
struct mq_handler contains file descriptors for the pipes and few fields for collecting statistic. The definition as follows:
struct mq_handler {
    struct mq_list list;
    struct mq_lock lock;

    int pipein[2]; // 0-> read end for obtaining request, 1-> write end for issueing request
    int pipeout[2]; // 0-> read end for obtaining response, 1-> write end for issueing response
    int process; // handler PID

    unsigned long pushed, // amount of pushed requests
        popped; // amount of popped responses
    unsigned nacked; // not acked packets
};
mq_init appeared above is used to initialize message queue structure and launch management thread:
void
mq_init(struct mq *mq)
{
    mq_list_init(&mq->handlers);
    mq_list_init(&mq->rsp_list);

    mq_lock_init(&mq->lock);

    pthread_mutex_init(&mq->rsp_lock, NULL);

    mq->state = MQ_STATE_STARTING;

    pthread_create(&mq->rsp_manager, NULL, mq_poll_wrapper, mq);

    mq->state = MQ_STATE_RUNNING;
    mq_set_available(&mq->lock, 1);
}
This routine initializes miscellaneous stuff and announces that it's ready to go. mq_poll_wrapper triggers mq_poll that is used to fetch responses from the pipe and cleans 'old' responses that are likely will not be needed anymore because their timeout exhausted and to avoid memory leak memory they use should be freed:
void *
mq_poll_wrapper(void *data)
{
    struct mq *mq = (struct mq *)data;

    unsigned int last_cleaned = timems(), now;

    for (;;) {
        mq_lock_lock(&mq->lock);
        if (mq->state == MQ_STATE_STOPPING) {
            mq_lock_unlock(&mq->lock);

            return NULL;
        } else if (mq->state == MQ_STATE_STARTING) {
            mq_lock_unlock(&mq->lock);

            release_cpu();

            continue;
        }

        mq_lock_unlock(&mq->lock);

        mq_poll(mq);

        now = timems();
        if (now - last_cleaned > MQ_MAX_RESPONSE_LIFE_TIME_MS) {
            last_cleaned = now;

            mq_cleanup_response_queue(mq, now);
        }
    }

    return NULL;
}
mq_launch_handler is a helper routine that creates pipe endpoints for transmitting requests and responses and launch request handling process. Its definition as follows:
int
mq_launch_handler(struct mq *mq,
                  struct mq_handler *handler,
                  void (*f)(int endpoints[2]))
{
    int endpoints[2];

    if (pipe(handler->pipein) == -1) {
        perror("pipe");

        goto fail_pipein;
    }
    if (pipe(handler->pipeout) == -1) {
        perror("pipe");

        goto fail_pipeout;
    }

    endpoints[0] = handler->pipein[0];
    endpoints[1] = handler->pipeout[1];

    if ((handler->process = launch_process(f, endpoints)) == -1)
        goto fail_launch;

    mq_lock_init(&handler->lock);

    handler->nacked = handler->popped = handler->pushed = 0;

    mq_lock_lock(&mq->lock);
    mq_list_add_tail(&mq->handlers, &handler->list);
    mq_lock_unlock(&mq->lock);

    mq_set_available(&handler->lock, 1);

    return 0;

  fail_launch:
    close(handler->pipeout[0]);
    close(handler->pipeout[1]);
  fail_pipeout:
    close(handler->pipein[0]);
    close(handler->pipein[1]);
  fail_pipein:

    return -1;
}
Now everything is ready for sending requests and receiving responses. Further details will be discussed in the following part.