Асинхронная работа с сокетами на C/C++ с libevent

5 февраля 2018

Помните, как когда-то мы писали простой TCP-сервер на C, а потом разбирали типичные ошибки? Описанный в этих статьях подход прекрасно работает, но только до тех пор, пока количество одновременно обслуживаемых соединений невелико — условно, пара сотен. Если же вам нужно обслуживать 10 или 50 тысяч соединений (так называемая проблема C10K), программу нужно писать совершенно иначе. Давайте разберемся, почему так, и как же нужно писать.

Суть проблемы

Почему же не работает просто держать 10 000 сокетов и обслуживать каждый из них в отдельном треде? Дело в том, что треды не бесплатны — операционная система выделяет под треды память, а также распределяет между ними время CPU. Переключение контекста конкретного ядра CPU с одного треда на другой — операция тоже не бесплатная. Это не проблема, когда тредов всего лишь несколько сотен. Но когда счет идет на тысячи тредов, больше времени начинает тратиться на переключение контекстов, чем на выполнение полезной работы.

Можно ли уменьшить число тредов и при этом продолжить обслуживать 10 000 соединений? Оказывается, что можно. Простейшее решение заключается в использовании системного вызова select(), который позволяет по списку файловых дескрипторов определить, какие из них стали готовы для заданного класса операций ввода-вывода. Возможности select(), правда, не безграничны — хорошо работает он только в случае, если файловых дескрипторов не больше сотни. Поэтому для решения той же задачи в современных операционных системах есть лишенные этого недостатка системные вызовы: в Linux это epoll, во FreeBSD — kqueue, в Windows — IOCP.

Поскольку на разных платформах системные вызовы разные, было разработано несколько библиотек, предоставляющих соответствующий слой абстракции. В качестве примеров можно привести библиотеки libevent, libev и libuv. Эти библиотеки имеют немного разные компромиссы. К примеру, libev не поддерживает Windows, потому что, действительно, кто же использует Windows на серверах, да еще и обслуживающих столько соединений?

В рамках этой заметки мы сосредоточим свое внимание на библиотеке libevent. Помимо прочего, libevent интересен тем, что имеет отличную документацию на Doxygen, кучу примеров, и даже небольшую бесплатную книжку. Библиотека используется в множестве известных проектов, таких, как Chromium, Memcached, Tor, Transmission и PgBouncer. Кроме того, у libevent есть ряд интересных фичей, например, встроенных асинхронный HTTP-сервер.

Использование libevent

Для примера напишем на базе libevent простенький чат. Клиенты цепляются к нему по TCP обычным telnet’ом и отправляют строчки. Строчку, отправленную одним клиентом, получают все остальные клиенты.

У меня код получился сравнительно большим, поэтому рассмотрим его по частям:

#define READ_BUFF_SIZE 128
#define WRITE_BUFF_SIZE ((READ_BUFF_SIZE)*8)

typedef struct connection_ctx_t {
    struct connection_ctx_t* next;
    struct connection_ctx_t* prev;

    evutil_socket_t fd;
    struct event_base* base;
    struct event* read_event;
    struct event* write_event;

    uint8_t read_buff[READ_BUFF_SIZE];
    uint8_t write_buff[WRITE_BUFF_SIZE];

    ssize_t read_buff_used;
    ssize_t write_buff_used;
} connection_ctx_t;

Эта структура представляет собой соединение пользователя. Все соединения объединяются в двусвязный список (поля next, prev). Каждое соединение хранит соответствующий ему сокет (fd), буфер принятых байт (read_buff, read_buff_used), а также буфер с байтами, готовыми к отправке (write_buff, write_buff_used). Есть также поля base, read_event и write_event, смысл которых станет понятен чуть ниже.

void run(char* host, int port) {
    // allocate memory for a connection ctx (used as linked list head)
    connection_ctx_t* head_ctx = (connection_ctx_t*)malloc(
                                   sizeof(connection_ctx_t));
    if(!head_ctx)
        error("malloc() failed");

    head_ctx->next = head_ctx;
    head_ctx->prev = head_ctx;
    head_ctx->write_event = NULL;
    head_ctx->read_buff_used = 0;
    head_ctx->write_buff_used = 0;

    // create a socket
    head_ctx->fd = socket(AF_INET, SOCK_STREAM, 0);
    if(head_ctx->fd < 0)
        error("socket() failed");

    // make it nonblocking
    if(evutil_make_socket_nonblocking(head_ctx->fd) < 0)
        error("evutil_make_socket_nonblocking() failed");

    // bind and listen
    struct sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_port = htons(port);
    sin.sin_addr.s_addr = inet_addr(host);
    if(bind(head_ctx->fd, (struct sockaddr*)&sin, sizeof(sin)) < 0)
        error("bind() failed");

    if(listen(head_ctx->fd, 1000) < 0)
        error("listen() failed");

    // create an event base
    struct event_base* base = event_base_new();
    if(!base)
        error("event_base_new() failed");

    // create a new event
    struct event* accept_event = event_new(base, head_ctx->fd,
        EV_READ | EV_PERSIST, on_accept, (void*)head_ctx);
    if(!accept_event)
        error("event_new() failed");

    head_ctx->base = base;
    head_ctx->read_event = accept_event;

    // schedule the execution of accept_event
    if(event_add(accept_event, NULL) < 0)
        error("event_add() failed");

    // run the event dispatching loop
    if(event_base_dispatch(base) < 0)
        error("event_base_dispatch() failed");

    // free allocated resources
    on_close(head_ctx);
    event_base_free(base);
}

Помимо обычной последовательности вызовов socket, bind, listen, созданный сокет здесь делается неблокирующим при помощи соответствующей процедуры из библиотеки libevent. Создается event_base, главный «класс» в libevent, благодаря которому осуществляется работа всего остального. Также создается событие с именем accept_event, выстреливающее, когда сокет становится доступен на чтение, то есть, в данном случае — готов принять новое соединение. Событие «включается» с помощью процедуры event_add, после чего выполнение программы уходит в недры процедуры event_base_dispatch, откуда при нормальном выполнении программы управление уже не вернется. Другими словами, этот код говорит «забиндить порт, и когда на него кто-то постучится, вызвать процедуру on_accept».

void on_accept(evutil_socket_t listen_sock, short flags, void* arg) {
    connection_ctx_t* head_ctx = (connection_ctx_t*)arg;
    evutil_socket_t fd = accept(listen_sock, 0, 0);

    if(fd < 0)
        error("accept() failed");

    // make in nonblocking
    if(evutil_make_socket_nonblocking(fd) < 0)
        error("evutil_make_socket_nonblocking() failed");

    connection_ctx_t* ctx = (connection_ctx_t*)malloc(
                              sizeof(connection_ctx_t));
    if(!ctx)
        error("malloc() failed");

    // add ctx to the linked list
    ctx->prev = head_ctx;
    ctx->next = head_ctx->next;
    head_ctx->next->prev = ctx;
    head_ctx->next = ctx;

    ctx->base = head_ctx->base;

    ctx->read_buff_used = 0;
    ctx->write_buff_used = 0;

    printf("[%p] New connection! fd = %d\n", ctx, fd);

    ctx->fd = fd;
    ctx->read_event = event_new(ctx->base, fd, EV_READ | EV_PERSIST,
                                on_read, (void*)ctx);
    if(!ctx->read_event)
        error("event_new(... EV_READ ...) failed");

    ctx->write_event = event_new(ctx->base, fd, EV_WRITE | EV_PERSIST,
                                 on_write, (void*)ctx);
    if(!ctx->write_event)
        error("event_new(... EV_WRITE ...) failed");

    if(event_add(ctx->read_event, NULL) < 0)
        error("event_add(read_event, ...) failed");
}

Эта процедура принимает входящее соединение с помощью системного вызова accept. Также она выделяет память для структуры connection_ctx_t, полностью заполняет ее и добавляет в двусвязных список всех открытых соединений. В процессе заполнения структуры с помощью процедуры event_new создаются события read_event и write_event по аналогии с тем, как это делалось в рассмотренной выше процедуре run. Из этих двух событий с помощью event_add «включается» только read_event. Другими словами, код говорит «принять соединение, добавить его в список, и когда что-то придет, дернуть процедуру on_read».

void on_read(evutil_socket_t fd, short flags, void* arg) {
    connection_ctx_t* ctx = arg;

    printf("[%p] on_read called, fd = %d\n", ctx, fd);

    ssize_t bytes;
    for(;;) {
        bytes = read(fd, ctx->read_buff + ctx->read_buff_used,
            READ_BUFF_SIZE - ctx->read_buff_used);
        if(bytes == 0) {
            printf("[%p] client disconnected!\n", ctx);
            on_close(ctx);
            return;
        }

        if(bytes < 0) {
            if(errno == EINTR)
                continue;

            printf("[%p] read() failed, errno = %d, "
                   "closing connection.\n", ctx, errno);
            on_close(ctx);
            return;
        }

        break; // read() succeeded
    }

    ssize_t check = ctx->read_buff_used;
    ssize_t check_end = ctx->read_buff_used + bytes;
    ctx->read_buff_used = check_end;

    while(check < check_end) {
        if(ctx->read_buff[check] != '\n') {
            check++;
            continue;
        }

        int length = (int)check;
        ctx->read_buff[length] = '\0';
        if((length > 0) && (ctx->read_buff[length - 1] == '\r')) {
            ctx->read_buff[length - 1] = '\0';
            length--;
        }

        on_string_received((const char*)ctx->read_buff, length, ctx);

        // shift read_buff (optimize!)
        memmove(ctx->read_buff, ctx->read_buff + check,
                check_end - check - 1);
        ctx->read_buff_used -= check + 1;
        check_end -= check;
        check = 0;
    }

    if(ctx->read_buff_used == READ_BUFF_SIZE) {
        printf("[%p] client sent a very long string, "
               "closing connection.\n", ctx);
        on_close(ctx);
    }
}

Процедура on_read вызывается, когда в одном из соединений есть данные, которые можно прочитать. Процедура считывает эти данные и помещает в read_buff. Когда в буфере оказывается строка, заканчивающаяся символом \n, он заменяется на \0, после чего вызывается on_string_received с полученной строкой, ее длиной и информацией о соединении в качестве аргументов. После обработки полученной строки содержимое буфера сдвигается. В случае возникновения ошибок (read_buff полностью заполнился, не удалось считать данные, …) клиентское соединение закрывается.

// called manually
void on_string_received(const char* str, int len,
                        connection_ctx_t* ctx) {
    printf("[%p] a complete string received: '%s', length = %d\n",
           ctx, str, len);

    connection_ctx_t* peer = ctx->next;
    while(peer != ctx) {
        if(peer->write_event == NULL) { // list head, skipping
            peer = peer->next;
            continue;
        }

        printf("[%p] sending a message to %p...\n", ctx, peer);

        // check that there is enough space in the write buffer
        if(WRITE_BUFF_SIZE - peer->write_buff_used < len + 1) {
            // if it's not, call on_close being careful with
            // the links in the linked list
            printf("[%p] unable to send a message to %p - "
                   "not enough space in the buffer; "
                   "closing %p's connection\n",
                   ctx,
                   peer,
                   peer);
            connection_ctx_t* next = peer->next;
            on_close(peer);
            peer = next;
            continue;
        }

        // append data to the buffer
        memcpy(peer->write_buff + peer->write_buff_used, str, len);
        peer->write_buff[peer->write_buff_used + len] = '\n';
        peer->write_buff_used += len + 1;

        // add writing event (it's not a problem to call it
        // multiple times)
        if(event_add(peer->write_event, NULL) < 0)
            error("event_add(peer->write_event, ...) failed");

        peer = peer->next;
    }
}

Процедура on_string_received берет принятую от одного из клиентов строку и записывает ее в конец write_buff остальных клиентов. Также для этих клиентов происходит «включение» событий write_event с помощью процедуры event_add. Таким образом мы говорим, что готовы что-то послать клиентам, если, конечно, они готовы что-то принять. В случае возникновения ошибки, например, нехватки места в write_buff, соответствующее соединение закрывается.

void on_write(evutil_socket_t fd, short flags, void* arg) {
    connection_ctx_t* ctx = arg;
    printf("[%p] on_write called, fd = %d\n", ctx, fd);

    ssize_t bytes;
    for(;;) {
        bytes = write(fd, ctx->write_buff, ctx->write_buff_used);
        if(bytes <= 0) {
            if(errno == EINTR)
                continue;

            printf("[%p] write() failed, errno = %d, "
                   "closing connection.\n", ctx, errno);
            on_close(ctx);
            return;
        }

        break; // write() succeeded
    }

    // shift the write_buffer (optimize!)
    memmove(ctx->write_buff, ctx->write_buff + bytes,
            ctx->write_buff_used - bytes);
    ctx->write_buff_used -= bytes;

    // if there is nothing to send call event_del
    if(ctx->write_buff_used == 0) {
        printf("[%p] write_buff is empty, "
               "calling event_del(write_event)\n", ctx);
        if(event_del(ctx->write_event) < 0)
            error("event_del() failed");
    }
}

Процедура on_write вызывается при включенном событии write_event (то есть, когда write_buff не пуст), когда сокет готов к передаче данных. Процедура просто записывает в сокет столько, сколько получится записать, и сдвигает write_buff на соответствующее количество байт. Если буфер при этом становится пустым, событие write_event выключается с помощью процедуры event_del. Если этого не сделать, on_write будет постоянно вызываться с пустым буфером на отправку, лишь зря грея CPU.

// called manually
void on_close(connection_ctx_t* ctx) {
    printf("[%p] on_close called, fd = %d\n", ctx, ctx->fd);

    // remove ctx from the lined list
    ctx->prev->next = ctx->next;
    ctx->next->prev = ctx->prev;

    event_del(ctx->read_event);
    event_free(ctx->read_event);

    if(ctx->write_event) {
        event_del(ctx->write_event);
        event_free(ctx->write_event);
    }

    close(ctx->fd);
    free(ctx);
}

Наконец, on_close вызывается при закрытии соединения — штатном или в случае той или иной ошибки. Информация о соединении удаляется из двусвязного списка, все выделенные ресурсы освобождаются, сокет закрывается.

Как видите, приведенный код довольно прост, во всяком случае, концептуально. Я протестировал его на Linux и FreeBSD как локально, так и по сети, и не смог выявить каких-либо дефектов. Желающие могут проверить его самостоятельно, в том числе и на Windows, и в случае необходимости прислать мне pull request. Репозиторий, как обычно, вы найдете на GitHub. Помимо приведенного выше кода в репозитории вы также найдете и небольшую тестовую утилиту, написанную на Go.

Заключение

Приведенный материал не претендует на рассмотрение абсолютно всех возможностей libevent. Помимо прочего, за кадром остались такие возможности, как I/O Buffers, фреймворк для построения RPC, функции для асинхронной работы с DNS, а также уже упомянутый асинхронный HTTP-сервер. В качестве домашнего задания вы можете изучить какую-нибудь из этих возможностей. Документацию и примеры вы найдете на официальном сайте libevent’а.

Также вы можете доработать мой пример, сделав из него, к примеру, очередь сообщений типа RabbitMQ — с топиками, подписками на них, и вот этим вот всем. При желании можно даже реализовать протокол AMQP. Вообще, если вы ищите идею для проекта на базе libevent или одной из его альтернатив, можно взять произвольный протокол (HTTP/2, WebSockets, Socks5, SMTP, FTP, IRC, …) и реализовать соответствующий сервер, клиент, или и то, и другое.

Метки: , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.