スキップしてメイン コンテンツに移動

Preliminary Look into libevent 2 for Windows

The libevent library is probably best known by its use in memcached. memcached is one of the key components of the LAMP stack. However the recent trend seems to instead highlight NoSQL solutions where typical RDBMS and memcached are replaced with more optimized and dedicated systems. The importance of libevent is not affected at all by that trend since it's not tied to any applications and can be used for NoSQL solutions. Tor is another use case of libevent. libevent is crucial for its cross-platform availability. One of the developers of libevent is Nick Mathewson who is also the key person behind the Tor project.

My interest in libevent stems from my own project, the DICE. From the beginning of the development in 2002, its I/O is fully optimized to Windows and I/O Completion Ports (IOCP) without using any middleware or libraries. Even though it works flawlessly in its current state, replacing it with a decent OSS library can be a good chance to make it more robust and future-proof. The only concern was about its performance, but libevent 2 is supposed to come with the IOCP support. The upcoming IOCP support led me to start the evaluation of the library.

The libevent version for this article is 2.0.7 RC. I built it with Visual Studio 2010 (VC10). Since I wanted to use the debugger to trace its flow, I built it as the debug version by changing the CFLAGS compiler options /Ox to /Od /MDd /Zi in the Makefile.nmake in the main directory and the test directory. 'nmake -f Makefile.nmake' produces the libevent.lib static library.

Before building a project with libevent, you need to manually copy WIN32-Code\event2\event-config.h to include\event2\event-config.h. Then add libevent.lib to the linker input (if you use socket, ws2_32.lib too), and add libevent and libevent\include directories to the include directory.

Then I wrote a test code with its evhttp 'Event-driven HTTP servers' functions. It's a simple Web server that shows requested URI and quits if /quit is requested. It listens to the port 8080.



#include <iostream>
#include <cmath>
#include <ctime>

#include <event.h>
#include <evhttp.h>
#include <event2/thread.h>

void handlerRoot(evhttp_request* request, void* args)
{
 std::cout << "Request URI: " << request->uri << std::endl;

 evbuffer* buffer = evbuffer_new();

 evkeyvalq* headers = evhttp_request_get_output_headers(request);
 evhttp_add_header(headers, "Content-Type", "text/html; charset=UTF-8");
 evbuffer_add_printf(buffer, "Request URI: %s", request->uri);

 evhttp_send_reply(request, HTTP_OK, "OK", buffer);

 evbuffer_free(buffer);
}

void handlerQuit(evhttp_request* request, void* args)
{
 event_base* eventBase = (event_base*)args;
 event_base_loopbreak(eventBase);
}

int _tmain(int argc, _TCHAR* argv[])
{
 WSADATA wsaData = {0};
 ::WSAStartup(MAKEWORD(2, 2), &wsaData);

 event_config* eventConfig = event_config_new();
 event_config_set_flag(eventConfig, EVENT_BASE_FLAG_STARTUP_IOCP);
 event_config_set_num_cpus_hint(eventConfig, 8);

 event_base* eventBase = event_base_new_with_config(eventConfig);

 evthread_use_windows_threads();

 evhttp* eventHttp = evhttp_new(eventBase);

 evhttp_set_gencb(eventHttp, handlerRoot, eventBase);
 evhttp_set_cb(eventHttp, "/quit", handlerQuit, eventBase);

 evhttp_bind_socket(eventHttp, "0.0.0.0", 8080);

 timeval tv = {::pow(2.0, 31) - ::time(NULL) - 1000, 0};

 event_base_loopexit(eventBase, &tv);

 event_base_dispatch(eventBase);

 evhttp_free(eventHttp);
 event_base_free(eventBase);

 ::WSACleanup();

 return 0;
}


The document is not yet complete for the recently added functions and I had to read the libevent code to add some function calls that enable IOCP. As the whatsnew-2.0.txt suggests, it's disabled by default. The wierdest part is probably this:

 timeval tv = {::pow(2.0, 31) - ::time(NULL) - 1000, 0};

 event_base_loopexit(eventBase, &tv);

 event_base_dispatch(eventBase);


event_base_dispatch starts the event dispatching loop, but if you enable IOCP it doesn't block and the program just exits. So you have to manually set its lifetime. I tried to set the largest signed integer (::pow(2.0, 31)), but unfortunately event_base_loopexit internally calls event_base_once which calls event_add_internal with tv_is_absolute = 0. The tricky code above is just a quick fix.

/* Implementation function to add an event.  Works just like event_add,
 * except: 1) it requires that we have the lock.  2) if tv_is_absolute is set,
 * we treat tv as an absolute time, not as an interval to add to the current
 * time */
static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
    int tv_is_absolute)
{


event_config_set_num_cpus_hint sets the CPU number as 8, it's because I use Core i7 with 8 SMT threads. Internally, it spawns (num_cpus * 2) threads in its thread pool for IOCP.

struct event_iocp_port *
event_iocp_port_launch(int n_cpus)
{
 struct event_iocp_port *port;
 int i;

 if (!extension_fns_initialized)
  init_extension_functions(&the_extension_fns);

 if (!(port = mm_calloc(1, sizeof(struct event_iocp_port))))
  return NULL;

 if (n_cpus <= 0)
  n_cpus = N_CPUS_DEFAULT;
 port->n_threads = n_cpus * 2;
 port->threads = calloc(port->n_threads, sizeof(HANDLE));
 if (!port->threads)
  goto err;

 port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,
   n_cpus);
 port->ms = -1;
 if (!port->port)
  goto err;

 port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL);
 if (!port->shutdownSemaphore)
  goto err;

 for (i=0; i<port->n_threads; ++i) {
  ev_uintptr_t th = _beginthread(loop, 0, port);
  if (th == (ev_uintptr_t)-1)
   goto err;
  port->threads[i] = (HANDLE)th;
  ++port->n_live_threads;
 }

 InitializeCriticalSectionAndSpinCount(&port->lock, 1000);

 return port;
err:
 if (port->port)
  CloseHandle(port->port);
 if (port->threads)
  mm_free(port->threads);
 if (port->shutdownSemaphore)
  CloseHandle(port->shutdownSemaphore);
 mm_free(port);
 return NULL;
}


The threads in the pool then block with GetQueuedCompletionStatus that waits for I/O completion notification.

static void
loop(void *_port)
{
 struct event_iocp_port *port = _port;
 long ms = port->ms;
 HANDLE p = port->port;

 if (ms <= 0)
  ms = INFINITE;

 while (1) {
  OVERLAPPED *overlapped=NULL;
  ULONG_PTR key=0;
  DWORD bytes=0;
  int ok = GetQueuedCompletionStatus(p, &bytes, &key,
   &overlapped, ms);
  EnterCriticalSection(&port->lock);
  if (port->shutdown) {
   if (--port->n_live_threads == 0)
    ReleaseSemaphore(port->shutdownSemaphore, 1,
      NULL);
   LeaveCriticalSection(&port->lock);
   return;
  }
  LeaveCriticalSection(&port->lock);

  if (key != NOTIFICATION_KEY && overlapped)
   handle_entry(overlapped, key, bytes, ok);
  else if (!overlapped)
   break;
 }
 event_warnx("GetQueuedCompletionStatus exited with no event.");
 EnterCriticalSection(&port->lock);
 if (--port->n_live_threads == 0)
  ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
 LeaveCriticalSection(&port->lock);
}


So I built the above code and ran it. It worked. But when I set breakpoints in the debugger, it turned out that things were not working as expected. It breaks at the line after GetQueuedCompletionStatus only when accepting a new connection. I suspected only the accept operation was overlapped.

static int
start_accepting(struct accepting_socket *as)
{
 /* requires lock */
 const struct win32_extension_fns *ext = event_get_win32_extension_fns();
 DWORD pending = 0;
 SOCKET s = socket(as->family, SOCK_STREAM, 0);
 if (s == INVALID_SOCKET)
  return -1;

 setsockopt(s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
     (char *)&as->lev->fd, sizeof(&as->lev->fd));

 if (!(as->lev->base.flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING))
  evutil_make_socket_nonblocking(s);

 if (event_iocp_port_associate(as->lev->port, s, 1) < 0) {
  closesocket(s);
  return -1;
 }

 as->s = s;

 if (ext->AcceptEx(as->lev->fd, s, as->addrbuf, 0,
  as->buflen/2, as->buflen/2, &pending, &as->overlapped.overlapped))
 {
  /* Immediate success! */
  accepted_socket_cb(&as->overlapped, 1, 0, 1);
 } else {
  int err = WSAGetLastError();
  if (err != ERROR_IO_PENDING) {
   event_warnx("AcceptEx: %s", evutil_socket_error_to_string(err));
   return -1;
  }
 }

 return 0;
}


In the debugger, it surely calls AcceptEx in start_accepting. The problem is, it uses the non-IOCP version of the read operation function.

evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
// snip

#ifdef WIN32
  {
   DWORD bytesRead;
   DWORD flags=0;
   if (WSARecv(fd, vecs, nvecs, &bytesRead, &flags, NULL, NULL)) {
    /* The read failed. It might be a close,
     * or it might be an error. */
    if (WSAGetLastError() == WSAECONNABORTED)
     n = 0;
    else
     n = -1;
   } else
    n = bytesRead;
  }
#else
  n = readv(fd, vecs, nvecs);
#endif


int
evbuffer_launch_read(struct evbuffer *buf, size_t at_most,
  struct event_overlapped *ol)
{
// snip

 _evbuffer_incref(buf);
 if (WSARecv(buf_o->fd, buf_o->buffers, nvecs, &bytesRead, &flags,
      &ol->overlapped, NULL)) {
  int error = WSAGetLastError();
  if (error != WSA_IO_PENDING) {
   /* An actual error. */
   pin_release(buf_o, EVBUFFER_MEM_PINNED_R);
   evbuffer_unfreeze(buf, 0);
   evbuffer_free(buf); /* decref */
   goto done;
  }
 }


The latter one gives an OVERLAPPED structure to initiate overlapped read, but it was not actually called.

I tracked where it went wrong. The culprit was in http.c. It used the bufferevent_new that doesn't support IOCP.

struct evhttp_connection *
evhttp_connection_base_new(struct event_base *base, struct evdns_base *dnsbase,
    const char *address, unsigned short port)
{
// snip

 if ((evcon->bufev = bufferevent_new(-1,
      evhttp_read_cb,
      evhttp_write_cb,
      evhttp_error_cb, evcon)) == NULL) {
  event_warn("%s: bufferevent_new failed", __func__);
  goto error;
 }


Instead, it has to use bufferevent_socket_new. But other parts of http.c have to be modified to work correctly, so I gave up there.

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
 struct bufferevent_private *bufev_p;
 struct bufferevent *bufev;

#ifdef WIN32
 if (base && event_base_get_iocp(base))
  return bufferevent_async_new(base, fd, options);
#endif


Though my time ran out, you may test if the async read/write does work or not by running the iocp/bufferevent_async and other regression tests defined in regress_iocp.c in the test folder. Don't forget to add the --no-fork option.

Will it be improved? I hope so, these IOCP details should be hidden from a user.

Another thing to look at is the thread pool. Since Windows 2000, the thread pool is an OS component that is optimized to the OS inner working. BindIoCompletionCallback is available since Windows 2000, and CreateThreadpool and other related API have been added in Windows Vista. It seems it's still far away that libevent is fully optimized to Windows.

However, as an easy-going framework to save time, it's in a good state already as shown in my example. A bit more polishing will make it shine.

コメント