Friday, September 10, 2010

event handling with message queue on top of pipes: part 2

The previous part stopped just before going into the details of sending and receiving mechanism.
It's implied that each sent message should receive an ACK from the handler when it was processed. This piece of information could be embedded in the handler response or sent back as a dedicated message if there's no answer expected. ACK should indicate that the handler finished processing the message. The sender can indicate if the ACK is not needed at all.
On one hand this kind of information could be used by sender to conclude whether message processing is complete. On other hand it's possible to statistically conclude how much time it takes the handler to process specific message. Though this information is not used in current rough implementation it could be accounted for probabilistic scheduling.

The prototype of routine for sending message is straightforward:

mq_enqueue_request(struct mq *mq,
                   struct mq_request *req,
                   int broadcast)
It accepts message queue descriptor as a first argument and message itself as a second. Third argument states whether the message should be delivered to all handlers or just to one. The function return a cookie which could be used by caller to obtain the answer(or ACK) from the handler.
If the request is not a broadcast mq_enqueue_request searches for the handler with the shortest message backlog. Also this routine is trying to find a handler writing to whose pipe will not block. However if all pipes are full this would be a blockable operation.

Aforementioned poll thread is used to avoid blocks in handlers on writing the responses(or ACKs) back. The thread is iterating over handlers' pipes gathering the responses and putting them into the list. Each response has its lifetime after which the resources would be freed.
In kernels prior to 2.6.35 there was no way to configure pipe length however now it possible via F_SETPIPE_SZ fcntl(the max size for non-root owners is limited by /proc/sys/fs/pipe-max-size). Since this thread is fetching messages from pipes on regular basis there's no need to set big pipe size. Current upper bound of 1MB is big enough to hold thousands of short messages and this could be limited to few KB unless big messages are not required.

Collecting responses into a dedicated list is also needed for obtaining the responses by a caller
since pipes are not iterative. Once data read from the pipe it's not possible to write it back(to be more precise it's possible but for iterating over the pipe all messages should be read and then written back - the overhead is to big in this case and also requires some synchronization mechanism to guard the write end of the pipe). mq_get_response is used to fetch the response from the global list:
struct mq_response *
mq_get_response(struct mq *mq,
                mq_cookie cookie)
Second argument, cookie, is a response identifier returned by mq_enqueue_request. If the message is not in the list of the already fetched responses this routine will return NULL. At the moment there is no way to say whether the response is still on the way or it was already purged.

Following example illustrates the flow of sending messages and receiving responses:
    int id;
    for (id=0;id<10000;++id) {
        struct mq_list *e, *t;
        struct mq_cookie_list *entry;
        mq_cookie cookie = mq_enqueue_request(&mq,
                                                         ((unsigned long long)pid << 32) | id,

        mq_list_for_each_safe(&cookies.list, e, t) {
            entry = container_of(e, struct mq_cookie_list, list);

            if ((rsp = mq_get_response(&mq, entry->cookie))) {
                /* Do something with the response */

        if ((rsp = mq_get_response(&mq, cookie))) {
            /* Do something with the response */
        } else {
            struct mq_cookie_list *entry = malloc(sizeof(struct mq_cookie_list));
            entry->cookie = cookie;
            mq_list_add_tail(&cookies.list, &entry->list);

Basic handler should deal with two kinds of events: MQ_EVENT_EXIT and MQ_EVENT_REQUEST. The following function could be a good skeleton for a message handler.
handler(int endpoints[2])
    struct mq_request req;
    struct mq_response rsp;
    unsigned int served = 0, id = 0;
    unsigned int pid = getpid();

    printf("Event Handler %u\n", pid);

    for (;;) {
        mq_get_request(&req, endpoints);


        if (req.event & MQ_EVENT_EXIT) {
                printf("Event Handler %u: exiting, request %u:%u, served %u\n", pid, (unsigned int)( >> 32), (unsigned int)( & 0xffffffff), served);
                mq_free_request(&req, endpoints);
        } else if (req.event & MQ_EVENT_REQUEST) {
            if ( {
                printf("Event Handler %u: request %u:%u, %s\n", pid, (unsigned int)( >> 32), (unsigned int)( & 0xffffffff), (char *);

                                                      ((unsigned long long)pid << 32) | id++,
        } else {
            printf("Event Handler %u: unknown event: %d", pid, (int)req.event);
            mq_free_request(&req, endpoints);
The sources of pipe message queue could be found at GitHub.

Friday, June 4, 2010

urxvt: 256 colors in ubuntu

Unfortunately Ubuntu has rxvt-unicode package without full 256 colors support.

The easiest way is to rebuild the package manually.

Following steps describe how to do that easily:

Get urxvt sources:

$ apt-get source rxvt-unicode
$ cd rxvt-unicode-9.06
Apply 256 color patch that could be already found in the package:
$ patch -p1 < doc/urxvt-8.2-256color.patch
Edit debian build rules file to reflect the changes from the patch. In debian/rules find definition of cfgcommon and replace
--with-term=rxvt-256color --enable-xterm-colors=256
Now you should checkout build dependencies and build the package:
$ sudo apt-get build-dep rxvt-unicode
$ dpkg-buildpackage -us -uc
Once the build process had been finished, install the resulting deb package:
$ sudo dpkg -i rxvt-unicode_9.06-1ubuntu0.09.10.1_i386.deb
At this point it's possible to delete build dependencies
$ sudo aptitude markauto $(apt-cache showsrc rxvt-unicode | grep Build-Depends: | sed -e 's/Build-Depends:\|,\|([^)]*)//g')
Don't forget to configure .Xresources to make use of 256 colors in urxvt. For instance mine looks like:
URxvt.termName:             rxvt-256color

Thursday, March 18, 2010

event handling with message queue on top of pipes: part 1

Event-driven program flow is common for interactive systems. Events usually come into the system in non-deterministic way. Also it's not always known how much time handling event would take. Moreover events for processing could appear while current one is being processed. Thereby it's better to handle events in parallel to aviod pilling up of pending events.
Some message interchange mechanism should be used to transmit events between the main dispatcher and handlers.
Exist different implementation of message queues but here I'd like to invent the wheel and show that there's still place for imagination.

In the message queue implementation pipe is used as a transmission path for messages.
Pipe is a generic mechanism to exchange data between the processes. It is a unidirectional channel, a bridge, between two instances of the process. One process is able to write data into the write end of the pipe while another one is able to read the written data from the read end. In most cases there is a writer on one side and a reader on the other.
The concept of [one reader]/[one writer] could be extended to [multiply readers]/[multiply writers] but this will complicate things. If keep things simple it would be necessary to create two separate pipes for message queue for one handler: one for sending requests from dispatcher's side and one for sending responses from handler's side.

Unlike FIFO(a named pipe visible in userspace within the filesystem as a special file), pipe created by a process is visible only by children of the process(and by the process itself of course). This effect is a consequence of how pipe is being represented: a file descriptor for each end.
Pipe could be created by calling pipe routine from the C standard library:

int pipe(int pipefd[2]);
Two file descriptors ensue from calling pipe: one is referring to the read end of the pipe, the other one is referring to the write end. In the most cases this call directly invokes a system call which has the same name.
From linux kernel sources it's obvious that the control is immediately passed from sys_pipe to sys_pipe2 system call:
SYSCALL_DEFINE2(pipe2, int __user *, fildes, int, flags)
 int fd[2];
 int error;

 error = do_pipe_flags(fd, flags);
SYSCALL_DEFINE1(pipe, int __user *, fildes)
 return sys_pipe2(fildes, 0);
Currently the flags are out of our interest that is why sys_pipe is used which uses zero for flags argument in sys_pipe2.
For us the most interesting part is hidden in do_pipe_flags routine. The part we are interested in is highlighted below:
int do_pipe_flags(int *fd, int flags)
 struct file *fw, *fr;
 fw = create_write_pipe(flags);
 fr = create_read_pipe(fw, flags);
Before going further I would like to make a small remark on how pipes are managed inside the linux kernel. The kernel manages pipes as files on pseudo filesystem pipefs. This filesystem could not be mounted from the userspace because it has MS_NOUSER flag but it is visible from kernel under pipe: 'mountpoint':
static struct vfsmount *pipe_mnt;
static int pipefs_get_sb(struct file_system_type *fs_type,
    int flags, const char *dev_name, void *data,
    struct vfsmount *mnt)
 return get_sb_pseudo(fs_type, "pipe:", NULL, PIPEFS_MAGIC, mnt);

static struct file_system_type pipe_fs_type = {
 .name  = "pipefs",
 .get_sb  = pipefs_get_sb,
 .kill_sb = kill_anon_super,

static int __init init_pipe_fs(void)
 int err = register_filesystem(&pipe_fs_type);

 if (!err) {
  pipe_mnt = kern_mount(&pipe_fs_type);
In do_pipe_flags routine create_write_pipe allocates inode on pipefs with the help of get_pipe_inode which in order calls alloc_pipe_info:
static struct inode * get_pipe_inode(void)
 struct inode *inode = new_inode(pipe_mnt->mnt_sb);
 struct pipe_inode_info *pipe;
 pipe = alloc_pipe_info(inode);
Besides initializing pipe-specific data alloc_pipe_info creates a waitqueue which is someway of interest for message queue(as would be shown below):
struct pipe_inode_info * alloc_pipe_info(struct inode *inode)
 return pipe;
Returning back to create_write_pipe the kernel creates new file with write-only flags and write_pipefifo_fops file operations:
const struct file_operations write_pipefifo_fops = {
 .llseek  = no_llseek,
 .read  = bad_pipe_r,
 .write  = do_sync_write,
 .aio_write = pipe_write,
 .open  = pipe_write_open,
 f = alloc_file(pipe_mnt, dentry, FMODE_WRITE, &write_pipefifo_fops);
 f->f_flags = O_WRONLY | (flags & O_NONBLOCK);
This is write end of the pipe.
create_read_pipe used for creation of read end of the pipe reuses the job done by write_pipefifo_fops:
struct file *create_read_pipe(struct file *wrf, int flags)
 struct file *f = get_empty_filp();
 /* Grab pipe from the writer */
 f->f_path = wrf->f_path;
 f->f_mapping = wrf->f_path.dentry->d_inode->i_mapping;

 f->f_pos = 0;
 f->f_flags = O_RDONLY | (flags & O_NONBLOCK);
 f->f_op = &read_pipefifo_fops;
inode mapping is unsurprisingly used from the write end but file operations differ:
const struct file_operations read_pipefifo_fops = {
 .llseek  = no_llseek,
 .read  = do_sync_read,
 .aio_read = pipe_read,
 .write  = bad_pipe_w,
 .open  = pipe_read_open,
Then with the help of get_unused_fd_flags file descriptors that would be returned into the userspace upon successful completion of sys_pipe system call are being created.

There's no more need to stay in kernel mode for the moment. Let's return to the userspace implementation of message queue.

The basic structures used to form message requests and responses are divided into header and payload parts:
struct mq_data {
    unsigned long len;
    char *data;
} __attribute__((packed));

struct mq_request {
    unsigned char event; // event type
    unsigned long long id; // request id
    unsigned long time; // issue time
    struct mq_data data; // payload
} __attribute__((packed));

struct mq_response {
    unsigned char event; // event type
    unsigned long long req_id; // request id to which the response belongs
    unsigned long long rsp_id; // response id
    unsigned long time; // issue time
    struct mq_data data; // payload
} __attribute__((packed));
struct mq_data represents message payload and generic for request and response. The comments descriptive enough. Worth to mention which event type are predefined in current implementation:
enum mq_event {
    MQ_EVENT_REQUEST            = (1 << 0), // generic request to the client
    MQ_EVENT_REQUEST_NO_ACK     = (1 << 1), // request that doesn't require ACK from the client
    MQ_EVENT_RESPONSE           = (1 << 2), // response from the client
    MQ_EVENT_ACK                = (1 << 3), // ACK to the request that it was processed
    MQ_EVENT_EXIT               = (1 << 4), // call to terminate the client
The principle of operation is pretty simple. On initialization stage the core process launches subprocesses that will handle messages, additionally it launches a thread that will receive responses from the handlers:
#define HANDLERS 3
struct mq mq;
struct mq_handler handlers[HANDLERS];
int main(int argc, char **argv)
    int i;
    for (i=0;i<HANDLERS;++i)
        if (mq_launch_handler(&mq, &handlers[i], handler) != 0) {
            fprintf(stderr, "Failed to launch handler\n");
            goto out;
The structure struct mq drawn above defines core message queue. The definition is as follows:
struct mq {
    struct mq_list handlers;  // queue handlers
    struct mq_lock lock;      // management lock

    short state;              // state of the queue

    pthread_t rsp_manager;        // thread handle for managing responses
    struct mq_list rsp_list;  // list of responses
    pthread_mutex_t rsp_lock; // lock for the list of responses
struct mq_list is a generic doubly-linked list implementation. I will not bother to discuss it here. Locking primitive struct mq_lock is built upon POSIX mutex in conjunction with simple reference counting.
The main job of the response handling thread mentioned as above is to fetch packets from pipe, do some preprocessing mostly needed for finer scheduling and put into local queue for later usage.
struct mq_handler contains file descriptors for the pipes and few fields for collecting statistic. The definition as follows:
struct mq_handler {
    struct mq_list list;
    struct mq_lock lock;

    int pipein[2]; // 0-> read end for obtaining request, 1-> write end for issueing request
    int pipeout[2]; // 0-> read end for obtaining response, 1-> write end for issueing response
    int process; // handler PID

    unsigned long pushed, // amount of pushed requests
        popped; // amount of popped responses
    unsigned nacked; // not acked packets
mq_init appeared above is used to initialize message queue structure and launch management thread:
mq_init(struct mq *mq)


    pthread_mutex_init(&mq->rsp_lock, NULL);

    mq->state = MQ_STATE_STARTING;

    pthread_create(&mq->rsp_manager, NULL, mq_poll_wrapper, mq);

    mq->state = MQ_STATE_RUNNING;
    mq_set_available(&mq->lock, 1);
This routine initializes miscellaneous stuff and announces that it's ready to go. mq_poll_wrapper triggers mq_poll that is used to fetch responses from the pipe and cleans 'old' responses that are likely will not be needed anymore because their timeout exhausted and to avoid memory leak memory they use should be freed:
void *
mq_poll_wrapper(void *data)
    struct mq *mq = (struct mq *)data;

    unsigned int last_cleaned = timems(), now;

    for (;;) {
        if (mq->state == MQ_STATE_STOPPING) {

            return NULL;
        } else if (mq->state == MQ_STATE_STARTING) {





        now = timems();
        if (now - last_cleaned > MQ_MAX_RESPONSE_LIFE_TIME_MS) {
            last_cleaned = now;

            mq_cleanup_response_queue(mq, now);

    return NULL;
mq_launch_handler is a helper routine that creates pipe endpoints for transmitting requests and responses and launch request handling process. Its definition as follows:
mq_launch_handler(struct mq *mq,
                  struct mq_handler *handler,
                  void (*f)(int endpoints[2]))
    int endpoints[2];

    if (pipe(handler->pipein) == -1) {

        goto fail_pipein;
    if (pipe(handler->pipeout) == -1) {

        goto fail_pipeout;

    endpoints[0] = handler->pipein[0];
    endpoints[1] = handler->pipeout[1];

    if ((handler->process = launch_process(f, endpoints)) == -1)
        goto fail_launch;


    handler->nacked = handler->popped = handler->pushed = 0;

    mq_list_add_tail(&mq->handlers, &handler->list);

    mq_set_available(&handler->lock, 1);

    return 0;


    return -1;
Now everything is ready for sending requests and receiving responses. Further details will be discussed in the following part.