Многопоточный UDP-сервер с epoll?

Я хотел бы разработать многопоточный сервер UDP в C/Linux. Служба работает на одном порту x, поэтому есть возможность привязать к ней только один UDP-сокет. Чтобы работать при высоких нагрузках, у меня есть n потоков (статически определенных), скажем, 1 поток на процессор. Работа может быть доставлена ​​в поток с помощью epoll_wait, поэтому потоки пробуждаются по требованию с помощью «EPOLLET | ЭПОЛЛОНЕШОТ. Я прикрепил пример кода:

static int epfd;
static sig_atomic_t sigint = 0;

...

/* Thread routine with epoll_wait */
static void *process_clients(void *pevents)
{
    int rc, i, sock, nfds;
    struct epoll_event ep, *events = (struct epoll_event *) pevents;

    while (!sigint) {
        nfds = epoll_wait(epfd, events, MAX_EVENT_NUM, 500);

        for (i = 0; i < nfds; ++i) {
           if (events[i].data.fd < 0)
                continue;

           sock = events[i].data.fd;

           if((events[i].events & EPOLLIN) == EPOLLIN) {
               printf("Event dispatch!\n");
               handle_request(sock); // do a recvfrom
           } else
               whine("Unknown poll event!\n");

           memset(&ep, 0, sizeof(ep));
           ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
           ep.data.fd = sock;

           rc = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ep);
           if(rc < 0)
               error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");
       }
    }

    pthread_exit(NULL);
}

int main(int argc, char **argv)
{
    int rc, i, cpu, sock, opts;
    struct sockaddr_in sin;
    struct epoll_event ep, *events;
    char *local_addr = "192.168.1.108";
    void *status;
    pthread_t *threads = NULL;
    cpu_set_t cpuset;

    threads = xzmalloc(sizeof(*threads) * MAX_THRD_NUM);
    events = xzmalloc(sizeof(*events) * MAX_EVENT_NUM);

    sock = socket(PF_INET, SOCK_DGRAM, 0);
    if (sock < 0)
        error_and_die(EXIT_FAILURE, "Cannot create socket!\n");

    /* Non-blocking */
    opts = fcntl(sock, F_GETFL);
    if(opts < 0)
        error_and_die(EXIT_FAILURE, "Cannot fetch sock opts!\n");
    opts |= O_NONBLOCK;
    rc = fcntl(sock, F_SETFL, opts);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot set sock opts!\n");

    /* Initial epoll setup */
    epfd = epoll_create(MAX_EVENT_NUM);
    if(epfd < 0)
        error_and_die(EXIT_FAILURE, "Error fetching an epoll descriptor!\n");

    memset(&ep, 0, sizeof(ep));
    ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    ep.data.fd = sock;

    rc = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ep);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");

    /* Socket binding */
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = inet_addr(local_addr);
    sin.sin_port = htons(port_xy);

    rc = bind(sock, (struct sockaddr *) &sin, sizeof(sin));
    if (rc < 0)
        error_and_die(EXIT_FAILURE, "Problem binding to port! "
                      "Already in use?\n");

    register_signal(SIGINT, &signal_handler);

    /* Thread initialization */
    for (i = 0, cpu = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_create(&threads[i], NULL, process_clients, events);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        CPU_ZERO(&cpuset);
        CPU_SET(cpu, &cpuset);

        rc = pthread_setaffinity_np(threads[i], sizeof(cpuset), &cpuset);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        cpu = (cpu + 1) % NR_CPUS_ON;
    }

    printf("up and running!\n");

    /* Thread joining */
    for (i = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_join(threads[i], &status);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Error on thread exit!\n");
    }

    close(sock);
    xfree(threads);
    xfree(events);

    printf("shut down!\n");

    return 0;
}

Это правильный способ обработки этого сценария с помощью epoll? Должна ли функция _handle_request_ возвращаться как можно быстрее, потому что на это время очередь событий для сокета заблокирована?!

Спасибо за ответы!


person Daniel    schedule 18.10.2010    source источник


Ответы (2)


Поскольку вы используете только один сокет UDP, нет смысла использовать epoll — вместо этого используйте блокирующий recvfrom.

Теперь, в зависимости от протокола, который вам нужно обрабатывать — если вы можете обрабатывать каждый пакет UDP по отдельности — вы можете фактически вызывать recvfrom одновременно из нескольких потоков (в пуле потоков). ОС позаботится о том, чтобы ровно один поток получил UDP-пакет. Затем этот поток может делать все, что ему нужно, в handle_request.

Однако, если вам нужно обрабатывать UDP-пакеты в определенном порядке, у вас, вероятно, не будет столько возможностей для параллелизации вашей программы...

person cmeerw    schedule 18.10.2010
comment
Именно то, что я хотел сказать :) - person MarkR; 19.10.2010
comment
Обновление на 2018 год. Хотя вы можете иметь N потоков, совместно использующих 1 сокет UDP, существует большое количество конфликтов спин-блокировок, поскольку в сокет добавляется больше потоков. Вы можете получить лучший прирост производительности, используя отдельный сокет UDP для каждого потока. Чтобы каждый сокет использовал тот же порт, что и другой, используйте параметр ioctl SO_REUSEPORT. Подробнее здесь: здесь можно отлично прочитать. - person selbie; 25.11.2018

Нет, это не будет работать так, как вы хотите. Чтобы рабочие потоки обрабатывали события, поступающие через интерфейс epoll, вам нужна другая архитектура.

Пример дизайна (есть несколько способов сделать это) Используется: семафоры SysV/POSIX.

  • Пусть главный поток создаст n подпотоков и семафор, а затем заблокирует опрос ваших сокетов (или что-то еще).

  • Пусть каждый подпоток блокируется при отключении семафора.

  • Когда главный поток разблокируется, он сохраняет события в некоторой глобальной структуре и запускает семафор один раз для каждого события.

  • Подпотоки разблокируются, обрабатывают события и снова блокируются, когда семафор возвращается к 0.

Вы можете использовать канал, совместно используемый всеми потоками, для достижения функциональности, очень похожей на функциональность семафора. Это позволит вам заблокировать select() вместо семафора, который вы можете использовать для пробуждения потоков по какому-либо другому событию (тайм-ауты, другие каналы и т. д.).

Вы также можете отменить этот элемент управления и заставить главный поток просыпаться, когда его рабочие процессы требуют выполнения задач. Я думаю, что описанный выше подход лучше подходит для вашего случая.

person slezica    schedule 18.10.2010
comment
Разве это не похоже на то, что делает epoll внутри? Он имеет своего рода очередь событий и диспетчер событий. потоки пробуждаются по запросу с помощью epoll_wait?! Я прочитал это объявление на LKML, где у разработчика PowerDNS был аналогичный вопрос (gossamer- threads.com/lists/linux/kernel/1197050)... - person Daniel; 18.10.2010
comment
Вы заставили меня усомниться... Однако я совершенно уверен, что наличие нескольких потоков, ожидающих одного и того же дескриптора epoll, вызывает проблему Thundering Herd. Подождите, я не совсем уверен сейчас. Черт. - person slezica; 18.10.2010
comment
Вы получаете громоподобное стадо только в том случае, если не используете EPOLLET (срабатывает по фронту). Кстати, в зависимости от того, что вы делаете в handle_request, вы, вероятно, можете обойтись без использования EPOLLONESHOT. Но тем не менее, epoll не имеет особого смысла, если у вас есть только один сокет. - person cmeerw; 18.10.2010
comment
Спасибо за Ваш ответ! Как упоминалось в ссылке, мне было интересно узнать о PowerDNS Recursor, который представляет собой UDP-сервер, работающий на определенном порту с использованием epoll и пула потоков. Очевидно, что это очень производительно, но, может быть, я просто ошибаюсь... хм. - person Daniel; 18.10.2010
comment
Случай с PowerDNS немного отличается (по крайней мере, отличается от того, что вы сказали нам в своем первоначальном вопросе). PowerDNS использует один сокет для получения клиентских запросов, но затем также создает сокеты для исходящих запросов/ответов (на восходящие серверы) — и поэтому необходимо использовать epoll. - person cmeerw; 19.10.2010