Current named pipe implementation does not have a queue for pending connections, similar to the *nix ‘listen(fd, 1)’.
This patch tries to tackle this drawback by creating a pool of instances. On each connect event we will retry to recreate the instances that were used. Creating a pool of instances means we cannot wait on the connection event anymore, since we do not have a single fd(handle) to be signaled. As a temporary solution, add a predefined timer to wake up and retry accepting connections. Further improvement can be done by adding an input/output completion port (https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx) over the pool instances and wait for it to be signaled. Setting up current IOCPs will require an additional effort. Signed-off-by: Alin Gabriel Serdean <[email protected]> --- lib/stream-windows.c | 139 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 87 insertions(+), 52 deletions(-) diff --git a/lib/stream-windows.c b/lib/stream-windows.c index 637920b..7e7932d 100644 --- a/lib/stream-windows.c +++ b/lib/stream-windows.c @@ -26,6 +26,8 @@ #include "stream-provider.h" #include "openvswitch/vlog.h" +#define MAX_NUMBER_OF_INSTANCES 64 + VLOG_DEFINE_THIS_MODULE(stream_windows); static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25); @@ -138,7 +140,6 @@ windows_open(const char *name, char *suffix, struct stream **streamp, * connect function. Use overlapped flag and file no buffering to ensure * asynchronous operations. */ npipe = create_snpipe(connect_path); - if (npipe == INVALID_HANDLE_VALUE && GetLastError() == ERROR_PIPE_BUSY) { retry = true; } @@ -172,6 +173,9 @@ windows_open(const char *name, char *suffix, struct stream **streamp, s->write_pending = false; s->retry_connect = retry; *streamp = &s->stream; + if (retry) { + return EAGAIN; + } return 0; } @@ -326,7 +330,7 @@ windows_send(struct stream *stream, const void *buffer, size_t n) return -WSAECONNRESET; } else if (!result) { VLOG_ERR_RL(&rl, "Could not send data on synchronous named pipe. Last " - "error: %s", ovs_lasterror_to_string()); + "error: %s.", ovs_lasterror_to_string()); return -EINVAL; } return (retval > 0 ? retval : -EAGAIN); @@ -372,13 +376,14 @@ const struct stream_class windows_stream_class = { struct pwindows_pstream { struct pstream pstream; - HANDLE fd; + HANDLE fd[MAX_NUMBER_OF_INSTANCES]; /* Unlink path to be deleted during close. */ char *unlink_path; /* Overlapped operation used for connect. */ - OVERLAPPED connect; + OVERLAPPED connect[MAX_NUMBER_OF_INSTANCES]; /* Flag to check if an operation is pending. */ - bool pending; + bool pending[MAX_NUMBER_OF_INSTANCES]; + HANDLE hEvents[MAX_NUMBER_OF_INSTANCES]; /* String used to create the named pipe. */ char *pipe_path; }; @@ -393,7 +398,8 @@ pwindows_pstream_cast(struct pstream *pstream) } /* Create a named pipe with read/write access, overlapped, message mode for - * writing, byte mode for reading and with a maximum of 64 active instances. */ + * writing, byte mode for reading and with a maximum of MAX_NUMBER_OF_INSTANCES + * active instances. */ static HANDLE create_pnpipe(char *name) { @@ -406,8 +412,8 @@ create_pnpipe(char *name) return INVALID_HANDLE_VALUE; } return CreateNamedPipe(name, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_MESSAGE | PIPE_READMODE_BYTE | PIPE_WAIT, - 64, BUFSIZE, BUFSIZE, 0, &sa); + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 0, &sa); } /* Passive named pipe connect. This function creates a new named pipe and @@ -417,56 +423,68 @@ pwindows_accept(struct pstream *pstream, struct stream **new_streamp) { struct pwindows_pstream *p = pwindows_pstream_cast(pstream); DWORD last_error = 0; - DWORD cbRet; + DWORD ret_value; HANDLE npipe; + int cur_handle = 0; + + DWORD number = WaitForMultipleObjects(MAX_NUMBER_OF_INSTANCES, + p->hEvents, + FALSE, + 0); + + cur_handle = number - WAIT_OBJECT_0; + if (cur_handle < 0 || cur_handle >(MAX_NUMBER_OF_INSTANCES - 1)) { + return EAGAIN; + } + if (p->fd[cur_handle] == INVALID_HANDLE_VALUE) { + npipe = create_pnpipe(p->pipe_path); + if (npipe != INVALID_HANDLE_VALUE) { + p->fd[cur_handle]; + ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]); + } + return EAGAIN; + } /* If the connect operation was pending, verify the result. */ - if (p->pending) { - if (!GetOverlappedResult(p->fd, &p->connect, &cbRet, FALSE)) { + if (p->pending[cur_handle]) { + if (!GetOverlappedResult(p->fd, &p->connect[cur_handle], &ret_value, FALSE)) { last_error = GetLastError(); if (last_error == ERROR_IO_INCOMPLETE) { /* If the operation is still pending, retry again. */ - p->pending = true; + p->pending[cur_handle] = true; return EAGAIN; } else { VLOG_ERR_RL(&rl, "Could not connect named pipe. Last " "error: %s", ovs_lasterror_to_string()); - DisconnectNamedPipe(p->fd); - return EINVAL; + DisconnectNamedPipe(p->fd[cur_handle]); + ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]); + return EAGAIN; } } - p->pending = false; + p->pending[cur_handle] = false; } - if (!p->pending && !ConnectNamedPipe(p->fd, &p->connect)) { + if (!p->pending[cur_handle] && !ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle])) { last_error = GetLastError(); if (last_error == ERROR_IO_PENDING) { /* Mark the accept operation as pending. */ - p->pending = true; + p->pending[cur_handle] = true; return EAGAIN; } else if (last_error != ERROR_PIPE_CONNECTED) { VLOG_ERR_RL(&rl, "Could not connect synchronous named pipe. Last " "error: %s", ovs_lasterror_to_string()); - DisconnectNamedPipe(p->fd); - return EINVAL; - } else { - /* If the pipe is connected, signal an event. */ - SetEvent(&p->connect.hEvent); + ResetEvent(p->connect[cur_handle].hEvent); + DisconnectNamedPipe(p->fd[cur_handle]); + ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]); + return EAGAIN; } } - npipe = create_pnpipe(p->pipe_path); - if (npipe == INVALID_HANDLE_VALUE) { - VLOG_ERR_RL(&rl, "Could not create a new named pipe after connect. ", - ovs_lasterror_to_string()); - return ENOENT; - } - /* Give the handle p->fd to the new created active stream and specify it * was created by an active stream. */ struct windows_stream *p_temp = xmalloc(sizeof *p_temp); stream_init(&p_temp->stream, &windows_stream_class, 0, "unix"); - p_temp->fd = p->fd; + p_temp->fd = p->fd[cur_handle]; /* Specify it was created by a passive stream. */ p_temp->server = true; /* Create events for read/write operations. */ @@ -481,10 +499,18 @@ pwindows_accept(struct pstream *pstream, struct stream **new_streamp) *new_streamp = &p_temp->stream; /* The passive handle p->fd will be the new created handle. */ - p->fd = npipe; - memset(&p->connect, 0, sizeof(p->connect)); - p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); - p->pending = false; + npipe = create_pnpipe(p->pipe_path); + p->fd[cur_handle] = npipe; + CloseHandle(p->connect[cur_handle].hEvent); + memset(&p->connect[cur_handle], 0, sizeof(p->connect[cur_handle])); + p->pending[cur_handle] = false; + if (npipe != INVALID_HANDLE_VALUE) { + ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]); + p->connect[cur_handle].hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); + } else { + p->connect[cur_handle].hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + } + p->hEvents[cur_handle] = p->connect[cur_handle].hEvent; return 0; } @@ -493,9 +519,11 @@ static void pwindows_close(struct pstream *pstream) { struct pwindows_pstream *p = pwindows_pstream_cast(pstream); - DisconnectNamedPipe(p->fd); - CloseHandle(p->fd); - CloseHandle(p->connect.hEvent); + for (int i = 0; i <= MAX_NUMBER_OF_INSTANCES; i++){ + DisconnectNamedPipe(p->fd[i]); + CloseHandle(p->fd[i]); + CloseHandle(p->connect[i].hEvent); + } maybe_unlink_and_free(p->unlink_path); free(p->pipe_path); free(p); @@ -505,8 +533,8 @@ pwindows_close(struct pstream *pstream) static void pwindows_wait(struct pstream *pstream) { - struct pwindows_pstream *p = pwindows_pstream_cast(pstream); - poll_wevent_wait(p->connect.hEvent); + long long now = time_msec(); + poll_timer_wait_until(now + 16); } /* Passive named pipe. */ @@ -515,7 +543,7 @@ pwindows_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp, uint8_t dscp OVS_UNUSED) { char *bind_path; - int error; + int error, i; HANDLE npipe; char *orig_path; @@ -543,22 +571,29 @@ pwindows_open(const char *name OVS_UNUSED, char *suffix, bind_path = xasprintf("%s%s", LOCAL_PREFIX, remove_slashes(path)); free(path); - npipe = create_pnpipe(bind_path); - - if (npipe == INVALID_HANDLE_VALUE) { - VLOG_ERR_RL(&rl, "Could not create named pipe. Last error: %s", - ovs_lasterror_to_string()); - return ENOENT; - } - struct pwindows_pstream *p = xmalloc(sizeof *p); pstream_init(&p->pstream, &pwindows_pstream_class, name); - p->fd = npipe; p->unlink_path = orig_path; - memset(&p->connect, 0, sizeof(p->connect)); - p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); - p->pending = false; p->pipe_path = bind_path; + for (i = 0; i < MAX_NUMBER_OF_INSTANCES; i++) { + npipe = create_pnpipe(bind_path); + if (npipe == INVALID_HANDLE_VALUE) { + VLOG_DBG_RL(&rl, "Could not create named pipe. Last error: %s", + ovs_lasterror_to_string()); + break; + } + p->fd[i] = npipe; + memset(&p->connect[i], 0, sizeof(p->connect[i])); + p->connect[i].hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); + p->pending[i] = false; + p->hEvents[i] = p->connect[i].hEvent; + ConnectNamedPipe(p->fd[i], &p->connect[i]); + } + if (i == 0) { + free(p); + return ENOENT; + } + *pstreamp = &p->pstream; return 0; } -- 2.10.2.windows.1 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
