This patch allows the remote driver to work with an asynchronous
EventImpl (it's the only one using an externally-supplied one), assuming
libvirt is compiled with pthread support. (Without pthreads, this code
is harmless in a single-threaded environment.)
Basically it uses a mutex to protect reads from the RPC socket in such a
way that message reads (in their entirety) are done atomically
(otherwise the remoteDomainEventFired() can race the call() code that
reads replies & events).
In addition, I update the EventImpl handle to prevent
remoteDomainEventFired() from being called everytime a reply is sent.
(This helps us dispatch events in a timely manner, though it's not
strictly necessary. Without it, any events coming in during a call()
won't be dispatched until the call drops the socket lock (because
remoteDomainEventFired() will be stuck awaiting the lock).
Dave
diff --git a/src/remote_internal.c b/src/remote_internal.c
index 2ca7930..59128f6 100644
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -116,6 +116,7 @@ struct private_data {
virDomainEventQueuePtr domainEvents;
/* Timer for flushing domainEvents queue */
int eventFlushTimer;
+ PTHREAD_MUTEX_T(lock); /* Serializes socket reads w/async EventImpl */
};
#define GET_PRIVATE(conn,retcode) \
@@ -700,6 +701,9 @@ doRemoteOpen (virConnectPtr conn,
} /* switch (transport) */
+ /* This must precede the first call() */
+ priv->eventFlushTimer = -1;
+
/* Try and authenticate with server */
if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1)
goto failed;
@@ -744,6 +748,8 @@ doRemoteOpen (virConnectPtr conn,
}
}
+ pthread_mutex_init(&priv->lock, NULL);
+
if(VIR_ALLOC(priv->callbackList)<0) {
error(conn, VIR_ERR_INVALID_ARG, _("Error allocating callbacks list"));
goto failed;
@@ -1250,6 +1256,8 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv)
/* Free queued events */
virDomainEventQueueFree(priv->domainEvents);
+ pthread_mutex_destroy(&priv->lock);
+
return 0;
}
@@ -4536,11 +4544,11 @@ static int really_read (virConnectPtr conn, struct private_data *priv,
* else Bad Things will happen in the XDR code.
*/
static int
-call (virConnectPtr conn, struct private_data *priv,
- int flags /* if we are in virConnectOpen */,
- int proc_nr,
- xdrproc_t args_filter, char *args,
- xdrproc_t ret_filter, char *ret)
+really_call (virConnectPtr conn, struct private_data *priv,
+ int flags /* if we are in virConnectOpen */,
+ int proc_nr,
+ xdrproc_t args_filter, char *args,
+ xdrproc_t ret_filter, char *ret)
{
char buffer[REMOTE_MESSAGE_MAX];
char buffer2[4];
@@ -4596,16 +4604,18 @@ call (virConnectPtr conn, struct private_data *priv,
really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1)
return -1;
+ pthread_mutex_lock(&priv->lock);
+
retry_read:
/* Read and deserialise length word. */
if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1)
- return -1;
+ goto unlock_return_err;
xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
if (!xdr_int (&xdr, &len)) {
error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
VIR_ERR_RPC, _("xdr_int (length word, reply)"));
- return -1;
+ goto unlock_return_err;
}
xdr_destroy (&xdr);
@@ -4615,12 +4625,14 @@ retry_read:
if (len < 0 || len > REMOTE_MESSAGE_MAX) {
error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
VIR_ERR_RPC, _("packet received from server too large"));
- return -1;
+ goto unlock_return_err;
}
/* Read reply header and what follows (either a ret or an error). */
if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len) == -1)
- return -1;
+ goto unlock_return_err;
+
+ pthread_mutex_unlock(&priv->lock);
/* Deserialise reply header. */
xdrmem_create (&xdr, buffer, len, XDR_DECODE);
@@ -4729,8 +4741,33 @@ retry_read:
xdr_destroy (&xdr);
return -1;
}
+
+ unlock_return_err:
+ pthread_mutex_unlock(&priv->lock);
+ return -1;
+}
+
+static int call (virConnectPtr conn, struct private_data *priv,
+ int flags /* if we are in virConnectOpen */,
+ int proc_nr,
+ xdrproc_t args_filter, char *args,
+ xdrproc_t ret_filter, char *ret)
+{
+ int rv;
+ if (priv->eventFlushTimer >= 0)
+ virEventUpdateHandle(priv->sock, 0);
+ rv = really_call(conn, priv, flags, proc_nr,
+ args_filter, args,
+ ret_filter, ret);
+ if (priv->eventFlushTimer >= 0)
+ virEventUpdateHandle(priv->sock,
+ VIR_EVENT_HANDLE_READABLE |
+ VIR_EVENT_HANDLE_ERROR |
+ VIR_EVENT_HANDLE_HANGUP);
+ return rv;
}
+
static int
really_write_buf (virConnectPtr conn, struct private_data *priv,
int in_open /* if we are in virConnectOpen */,
@@ -5287,14 +5324,16 @@ remoteDomainEventFired(int fd ATTRIBUTE_UNUSED,
return;
}
+ pthread_mutex_lock(&priv->lock);
+
/* Read and deserialise length word. */
if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
- return;
+ goto unlock_and_return;
xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
if (!xdr_int (&xdr, &len)) {
error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
- return;
+ goto unlock_and_return;
}
xdr_destroy (&xdr);
@@ -5303,15 +5342,17 @@ remoteDomainEventFired(int fd ATTRIBUTE_UNUSED,
if (len < 0 || len > REMOTE_MESSAGE_MAX) {
error (conn, VIR_ERR_RPC, _("packet received from server too large"));
- return;
+ goto unlock_and_return;
}
/* Read reply header and what follows (either a ret or an error). */
if (really_read (conn, priv, 0, buffer, len) == -1) {
error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
- return;
+ goto unlock_and_return;
}
+ pthread_mutex_unlock(&priv->lock);
+
/* Deserialise reply header. */
xdrmem_create (&xdr, buffer, len, XDR_DECODE);
if (!xdr_remote_message_header (&xdr, &hdr)) {
@@ -5327,6 +5368,11 @@ remoteDomainEventFired(int fd ATTRIBUTE_UNUSED,
DEBUG0("invalid proc in event firing");
error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
}
+
+ return;
+
+ unlock_and_return:
+ pthread_mutex_unlock(&priv->lock);
}
void
--
Libvir-list mailing list
[email protected]
https://www.redhat.com/mailman/listinfo/libvir-list