Event-driven program flow is common for interactive systems. Events usually come into the system in non-deterministic way. Also it's not always known how much time handling event would take. Moreover events for processing could appear while current one is being processed. Thereby it's better to handle events in parallel to aviod pilling up of pending events.
Some message interchange mechanism should be used to transmit events between the main dispatcher and handlers.
Exist different implementation of message queues but here I'd like to invent the wheel and show that there's still place for imagination.
In the message queue implementation pipe is used as a transmission path for messages.
Pipe is a generic mechanism to exchange data between the processes. It is a unidirectional channel, a bridge, between two instances of the process. One process is able to write data into the write end of the pipe while another one is able to read the written data from the read end. In most cases there is a writer on one side and a reader on the other.
The concept of [one reader]/[one writer] could be extended to [multiply readers]/[multiply writers] but this will complicate things. If keep things simple it would be necessary to create two separate pipes for message queue for one handler: one for sending requests from dispatcher's side and one for sending responses from handler's side.
Unlike FIFO(a named pipe visible in userspace within the filesystem as a special file), pipe created by a process is visible only by children of the process(and by the process itself of course). This effect is a consequence of how pipe is being represented: a file descriptor for each end.
Pipe could be created by calling pipe routine from the C standard library:
int pipe(int pipefd[2]);
Two file descriptors ensue from calling
pipe: one is referring to the read end of the pipe, the other one is referring to the write end. In the most cases this call directly invokes a system call which has the same name.
From linux kernel sources it's obvious that the control is immediately passed from
sys_pipe to
sys_pipe2 system call:
SYSCALL_DEFINE2(pipe2, int __user *, fildes, int, flags)
{
int fd[2];
int error;
error = do_pipe_flags(fd, flags);
...
}
SYSCALL_DEFINE1(pipe, int __user *, fildes)
{
return sys_pipe2(fildes, 0);
}
Currently the flags are out of our interest that is why
sys_pipe is used which uses zero for flags argument in
sys_pipe2.
For us the most interesting part is hidden in
do_pipe_flags routine. The part we are interested in is highlighted below:
int do_pipe_flags(int *fd, int flags)
{
struct file *fw, *fr;
...
fw = create_write_pipe(flags);
...
fr = create_read_pipe(fw, flags);
...
}
Before going further I would like to make a small remark on how pipes are managed inside the linux kernel. The kernel manages pipes as files on pseudo filesystem
pipefs. This filesystem could not be mounted from the userspace because it has MS_NOUSER flag but it is visible from kernel under
pipe: 'mountpoint':
static struct vfsmount *pipe_mnt;
...
static int pipefs_get_sb(struct file_system_type *fs_type,
int flags, const char *dev_name, void *data,
struct vfsmount *mnt)
{
return get_sb_pseudo(fs_type, "pipe:", NULL, PIPEFS_MAGIC, mnt);
}
static struct file_system_type pipe_fs_type = {
.name = "pipefs",
.get_sb = pipefs_get_sb,
.kill_sb = kill_anon_super,
};
static int __init init_pipe_fs(void)
{
int err = register_filesystem(&pipe_fs_type);
if (!err) {
pipe_mnt = kern_mount(&pipe_fs_type);
...
}
In
do_pipe_flags routine
create_write_pipe allocates inode on pipefs with the help of
get_pipe_inode which in order calls
alloc_pipe_info:
static struct inode * get_pipe_inode(void)
{
struct inode *inode = new_inode(pipe_mnt->mnt_sb);
struct pipe_inode_info *pipe;
...
pipe = alloc_pipe_info(inode);
...
}
Besides initializing pipe-specific data
alloc_pipe_info creates a waitqueue which is someway of interest for message queue(as would be shown below):
struct pipe_inode_info * alloc_pipe_info(struct inode *inode)
{
...
init_waitqueue_head(&pipe->wait);
...
return pipe;
}
Returning back to
create_write_pipe the kernel creates new file with write-only flags and
write_pipefifo_fops file operations:
const struct file_operations write_pipefifo_fops = {
.llseek = no_llseek,
.read = bad_pipe_r,
.write = do_sync_write,
.aio_write = pipe_write,
...
.open = pipe_write_open,
...
};
...
f = alloc_file(pipe_mnt, dentry, FMODE_WRITE, &write_pipefifo_fops);
...
f->f_flags = O_WRONLY | (flags & O_NONBLOCK);
This is write end of the pipe.
create_read_pipe used for creation of read end of the pipe reuses the job done by
write_pipefifo_fops:
struct file *create_read_pipe(struct file *wrf, int flags)
{
struct file *f = get_empty_filp();
...
/* Grab pipe from the writer */
f->f_path = wrf->f_path;
path_get(&wrf->f_path);
f->f_mapping = wrf->f_path.dentry->d_inode->i_mapping;
f->f_pos = 0;
f->f_flags = O_RDONLY | (flags & O_NONBLOCK);
f->f_op = &read_pipefifo_fops;
...
}
inode mapping is unsurprisingly used from the write end but file operations differ:
const struct file_operations read_pipefifo_fops = {
.llseek = no_llseek,
.read = do_sync_read,
.aio_read = pipe_read,
.write = bad_pipe_w,
...
.open = pipe_read_open,
...
};
Then with the help of
get_unused_fd_flags file descriptors that would be returned into the userspace upon successful completion of sys_pipe system call are being created.
There's no more need to stay in kernel mode for the moment. Let's return to the userspace implementation of message queue.
The basic structures used to form message requests and responses are divided into header and payload parts:
struct mq_data {
unsigned long len;
char *data;
} __attribute__((packed));
struct mq_request {
unsigned char event; // event type
unsigned long long id; // request id
unsigned long time; // issue time
struct mq_data data; // payload
} __attribute__((packed));
struct mq_response {
unsigned char event; // event type
unsigned long long req_id; // request id to which the response belongs
unsigned long long rsp_id; // response id
unsigned long time; // issue time
struct mq_data data; // payload
} __attribute__((packed));
struct mq_data represents message payload and generic for request and response. The comments descriptive enough. Worth to mention which event type are predefined in current implementation:
enum mq_event {
MQ_EVENT_REQUEST = (1 << 0), // generic request to the client
MQ_EVENT_REQUEST_NO_ACK = (1 << 1), // request that doesn't require ACK from the client
MQ_EVENT_RESPONSE = (1 << 2), // response from the client
MQ_EVENT_ACK = (1 << 3), // ACK to the request that it was processed
MQ_EVENT_EXIT = (1 << 4), // call to terminate the client
};
The principle of operation is pretty simple. On initialization stage the core process launches subprocesses that will handle messages, additionally it launches a thread that will receive responses from the handlers:
#define HANDLERS 3
struct mq mq;
struct mq_handler handlers[HANDLERS];
int main(int argc, char **argv)
{
int i;
....
mq_init(&mq);
....
for (i=0;i<HANDLERS;++i)
if (mq_launch_handler(&mq, &handlers[i], handler) != 0) {
fprintf(stderr, "Failed to launch handler\n");
goto out;
}
....
The structure
struct mq drawn above defines core message queue. The definition is as follows:
struct mq {
struct mq_list handlers; // queue handlers
struct mq_lock lock; // management lock
short state; // state of the queue
pthread_t rsp_manager; // thread handle for managing responses
struct mq_list rsp_list; // list of responses
pthread_mutex_t rsp_lock; // lock for the list of responses
};
struct mq_list is a generic doubly-linked list implementation. I will not bother to discuss it here. Locking primitive
struct mq_lock is built upon POSIX mutex in conjunction with simple reference counting.
The main job of the response handling thread mentioned as above is to fetch packets from pipe, do some preprocessing mostly needed for finer scheduling and put into local queue for later usage.
struct mq_handler contains file descriptors for the pipes and few fields for collecting statistic. The definition as follows:
struct mq_handler {
struct mq_list list;
struct mq_lock lock;
int pipein[2]; // 0-> read end for obtaining request, 1-> write end for issueing request
int pipeout[2]; // 0-> read end for obtaining response, 1-> write end for issueing response
int process; // handler PID
unsigned long pushed, // amount of pushed requests
popped; // amount of popped responses
unsigned nacked; // not acked packets
};
mq_init appeared above is used to initialize message queue structure and launch management thread:
void
mq_init(struct mq *mq)
{
mq_list_init(&mq->handlers);
mq_list_init(&mq->rsp_list);
mq_lock_init(&mq->lock);
pthread_mutex_init(&mq->rsp_lock, NULL);
mq->state = MQ_STATE_STARTING;
pthread_create(&mq->rsp_manager, NULL, mq_poll_wrapper, mq);
mq->state = MQ_STATE_RUNNING;
mq_set_available(&mq->lock, 1);
}
This routine initializes miscellaneous stuff and announces that it's ready to go.
mq_poll_wrapper triggers
mq_poll that is used to fetch responses from the pipe and cleans 'old' responses that are likely will not be needed anymore because their timeout exhausted and to avoid memory leak memory they use should be freed:
void *
mq_poll_wrapper(void *data)
{
struct mq *mq = (struct mq *)data;
unsigned int last_cleaned = timems(), now;
for (;;) {
mq_lock_lock(&mq->lock);
if (mq->state == MQ_STATE_STOPPING) {
mq_lock_unlock(&mq->lock);
return NULL;
} else if (mq->state == MQ_STATE_STARTING) {
mq_lock_unlock(&mq->lock);
release_cpu();
continue;
}
mq_lock_unlock(&mq->lock);
mq_poll(mq);
now = timems();
if (now - last_cleaned > MQ_MAX_RESPONSE_LIFE_TIME_MS) {
last_cleaned = now;
mq_cleanup_response_queue(mq, now);
}
}
return NULL;
}
mq_launch_handler is a helper routine that creates pipe endpoints for transmitting requests and responses and launch request handling process. Its definition as follows:
int
mq_launch_handler(struct mq *mq,
struct mq_handler *handler,
void (*f)(int endpoints[2]))
{
int endpoints[2];
if (pipe(handler->pipein) == -1) {
perror("pipe");
goto fail_pipein;
}
if (pipe(handler->pipeout) == -1) {
perror("pipe");
goto fail_pipeout;
}
endpoints[0] = handler->pipein[0];
endpoints[1] = handler->pipeout[1];
if ((handler->process = launch_process(f, endpoints)) == -1)
goto fail_launch;
mq_lock_init(&handler->lock);
handler->nacked = handler->popped = handler->pushed = 0;
mq_lock_lock(&mq->lock);
mq_list_add_tail(&mq->handlers, &handler->list);
mq_lock_unlock(&mq->lock);
mq_set_available(&handler->lock, 1);
return 0;
fail_launch:
close(handler->pipeout[0]);
close(handler->pipeout[1]);
fail_pipeout:
close(handler->pipein[0]);
close(handler->pipein[1]);
fail_pipein:
return -1;
}
Now everything is ready for sending requests and receiving responses. Further details will be discussed in the following part.