On Mon, 2011-03-07 at 14:10 -0600, Michael Roth wrote: > This implements the state machine/logic used to manage > send/receive/execute phases of RPCs we send or receive. It does so using > a set of abstract methods we implement with the application and > transport level code which will follow. > > Signed-off-by: Michael Roth <mdr...@linux.vnet.ibm.com> > --- > virtagent-manager.c | 326 > +++++++++++++++++++++++++++++++++++++++++++++++++++ > virtagent-manager.h | 130 ++++++++++++++++++++ > 2 files changed, 456 insertions(+), 0 deletions(-) > create mode 100644 virtagent-manager.c > create mode 100644 virtagent-manager.h > > diff --git a/virtagent-manager.c b/virtagent-manager.c > new file mode 100644 > index 0000000..51d26a3 > --- /dev/null > +++ b/virtagent-manager.c > @@ -0,0 +1,326 @@ > +/* > + * virtagent - job queue management > + * > + * Copyright IBM Corp. 2011 > + * > + * Authors: > + * Michael Roth <mdr...@linux.vnet.ibm.com> > + * > + * This work is licensed under the terms of the GNU GPL, version 2 or later. > + * See the COPYING file in the top-level directory. > + * > + */ > + > +#include "virtagent-common.h" > + > +typedef struct VAServerJob { > + char tag[64]; > + void *opaque; > + VAServerJobOps ops; > + QTAILQ_ENTRY(VAServerJob) next; > + enum { > + VA_SERVER_JOB_STATE_NEW = 0, > + VA_SERVER_JOB_STATE_BUSY, > + VA_SERVER_JOB_STATE_EXECUTED, > + VA_SERVER_JOB_STATE_SENT, > + VA_SERVER_JOB_STATE_DONE, > + } state; > +} VAServerJob; > + > +typedef struct VAClientJob { > + char tag[64]; > + void *opaque; > + void *resp_opaque; > + VAClientJobOps ops; > + QTAILQ_ENTRY(VAClientJob) next; > + enum { > + VA_CLIENT_JOB_STATE_NEW = 0, > + VA_CLIENT_JOB_STATE_BUSY, > + VA_CLIENT_JOB_STATE_SENT, > + VA_CLIENT_JOB_STATE_READ, > + VA_CLIENT_JOB_STATE_DONE, > + } state; > +} VAClientJob; > + > +#define SEND_COUNT_MAX 1 > +#define EXECUTE_COUNT_MAX 4
It's not immediately clear what the difference between SEND_COUNT_MAX and EXECUTE_COUNT_MAX is. Some comments would help. Also, will the code work if these numbers are changed? If not, a note about what someone needs to look at when changing these would seem appropriate. > + > +struct VAManager { > + int send_count; /* sends in flight */ > + int execute_count; /* number of jobs currently executing */ > + QTAILQ_HEAD(, VAServerJob) server_jobs; > + QTAILQ_HEAD(, VAClientJob) client_jobs; > +}; > + > +/* server job operations/helpers */ > + > +static VAServerJob *va_server_job_by_tag(VAManager *m, const char *tag) > +{ > + VAServerJob *j; > + QTAILQ_FOREACH(j, &m->server_jobs, next) { > + if (strcmp(j->tag, tag) == 0) { > + return j; > + } > + } > + return NULL; > +} > + > +int va_server_job_add(VAManager *m, const char *tag, void *opaque, > + VAServerJobOps ops) > +{ > + VAServerJob *j = qemu_mallocz(sizeof(VAServerJob)); > + TRACE("called"); Qemu has a good tracing infrastructure. If this is trace point is useful enough to keep around, it should try to use that. If it's not that important, I'd remove it entirely. I believe this has been flagged in an earlier RFC too. > + j->state = VA_SERVER_JOB_STATE_NEW; > + j->ops = ops; > + j->opaque = opaque; > + memset(j->tag, 0, 64); > + pstrcpy(j->tag, 63, tag); Magic numbers. Should use something like #define TAG_LEN 64 > + QTAILQ_INSERT_TAIL(&m->server_jobs, j, next); > + va_kick(m); > + return 0; > +} > + > +static void va_server_job_execute(VAServerJob *j) > +{ > + TRACE("called"); > + j->state = VA_SERVER_JOB_STATE_BUSY; > + j->ops.execute(j->opaque, j->tag); > +} > + > +/* TODO: need a way to pass information back */ > +void va_server_job_execute_done(VAManager *m, const char *tag) > +{ > + VAServerJob *j = va_server_job_by_tag(m, tag); > + TRACE("called"); > + if (!j) { > + LOG("server job with tag \"%s\" not found", tag); > + return; > + } > + j->state = VA_SERVER_JOB_STATE_EXECUTED; > + va_kick(m); > +} > + > +static void va_server_job_send(VAServerJob *j) > +{ > + TRACE("called"); > + j->state = VA_SERVER_JOB_STATE_BUSY; > + j->ops.send(j->opaque, j->tag); > +} > + > +void va_server_job_send_done(VAManager *m, const char *tag) > +{ > + VAServerJob *j = va_server_job_by_tag(m, tag); > + TRACE("called"); > + if (!j) { > + LOG("server job with tag \"%s\" not found", tag); > + return; > + } > + j->state = VA_SERVER_JOB_STATE_SENT; > + m->send_count--; > + va_kick(m); > +} > + > +static void va_server_job_callback(VAServerJob *j) > +{ > + TRACE("called"); > + j->state = VA_SERVER_JOB_STATE_BUSY; > + if (j->ops.callback) { > + j->ops.callback(j->opaque, j->tag); > + } > + j->state = VA_SERVER_JOB_STATE_DONE; > +} > + > +void va_server_job_cancel(VAManager *m, const char *tag) > +{ > + VAServerJob *j = va_server_job_by_tag(m, tag); > + TRACE("called"); > + if (!j) { > + LOG("server job with tag \"%s\" not found", tag); > + return; > + } > + /* TODO: need to decrement sends/execs in flight appropriately */ > + /* make callback and move to done state, kick() will handle cleanup */ > + va_server_job_callback(j); > + va_kick(m); > +} > + > +/* client job operations */ > + > +static VAClientJob *va_client_job_by_tag(VAManager *m, const char *tag) > +{ > + VAClientJob *j; > + QTAILQ_FOREACH(j, &m->client_jobs, next) { > + if (strcmp(j->tag, tag) == 0) { > + return j; > + } > + } > + return NULL; > +} > + > +int va_client_job_add(VAManager *m, const char *tag, void *opaque, > + VAClientJobOps ops) > +{ > + VAClientJob *j = qemu_mallocz(sizeof(VAClientJob)); > + TRACE("called"); > + j->ops = ops; > + j->opaque = opaque; > + memset(j->tag, 0, 64); > + pstrcpy(j->tag, 63, tag); > + QTAILQ_INSERT_TAIL(&m->client_jobs, j, next); > + va_kick(m); > + return 0; > +} > + > +static void va_client_job_send(VAClientJob *j) > +{ > + TRACE("called"); > + j->state = VA_CLIENT_JOB_STATE_BUSY; > + j->ops.send(j->opaque, j->tag); > +} > + > +void va_client_job_send_done(VAManager *m, const char *tag) > +{ > + VAClientJob *j = va_client_job_by_tag(m, tag); > + TRACE("called"); > + if (!j) { > + LOG("client job with tag \"%s\" not found", tag); > + return; > + } > + j->state = VA_CLIENT_JOB_STATE_SENT; > + m->send_count--; > + va_kick(m); > +} > + > +void va_client_job_read_done(VAManager *m, const char *tag, void *resp) > +{ > + VAClientJob *j = va_client_job_by_tag(m, tag); > + TRACE("called"); > + if (!j) { > + LOG("client job with tag \"%s\" not found", tag); > + return; > + } > + j->state = VA_CLIENT_JOB_STATE_READ; > + j->resp_opaque = resp; > + va_kick(m); > +} > + > +static void va_client_job_callback(VAClientJob *j) > +{ > + TRACE("called"); > + j->state = VA_CLIENT_JOB_STATE_BUSY; > + if (j->ops.callback) { > + j->ops.callback(j->opaque, j->resp_opaque, j->tag); > + } > + j->state = VA_CLIENT_JOB_STATE_DONE; > +} > + > +void va_client_job_cancel(VAManager *m, const char *tag) > +{ > + VAClientJob *j = va_client_job_by_tag(m, tag); > + TRACE("called"); > + if (!j) { > + LOG("client job with tag \"%s\" not found", tag); > + return; > + } > + /* TODO: need to decrement sends/execs in flight appropriately */ > + /* make callback and move to done state, kick() will handle cleanup */ > + va_client_job_callback(j); > + va_kick(m); > +} > + > +/* general management functions */ > + > +VAManager *va_manager_new(void) > +{ > + VAManager *m = qemu_mallocz(sizeof(VAManager)); > + QTAILQ_INIT(&m->client_jobs); > + QTAILQ_INIT(&m->server_jobs); > + return m; > +} > + > +static void va_process_server_job(VAManager *m, VAServerJob *sj) > +{ > + switch (sj->state) { > + case VA_SERVER_JOB_STATE_NEW: > + TRACE("marker"); > + va_server_job_execute(sj); > + break; > + case VA_SERVER_JOB_STATE_EXECUTED: > + TRACE("marker"); > + if (m->send_count < SEND_COUNT_MAX) { > + TRACE("marker"); > + va_server_job_send(sj); > + m->send_count++; > + } > + break; > + case VA_SERVER_JOB_STATE_SENT: > + TRACE("marker"); > + va_server_job_callback(sj); > + break; > + case VA_SERVER_JOB_STATE_BUSY: > + TRACE("marker, server job currently busy"); > + break; > + case VA_SERVER_JOB_STATE_DONE: > + TRACE("marker"); > + QTAILQ_REMOVE(&m->server_jobs, sj, next); > + break; > + default: > + LOG("error, unknown server job state"); > + break; > + } > +} > + > +static void va_process_client_job(VAManager *m, VAClientJob *cj) > +{ > + switch (cj->state) { > + case VA_CLIENT_JOB_STATE_NEW: > + TRACE("marker"); > + if (m->send_count < SEND_COUNT_MAX) { > + TRACE("marker"); > + va_client_job_send(cj); > + m->send_count++; > + } > + break; > + case VA_CLIENT_JOB_STATE_SENT: > + TRACE("marker"); > + //nothing to do here, awaiting read_done() > + break; > + case VA_CLIENT_JOB_STATE_READ: > + TRACE("marker"); > + va_client_job_callback(cj); > + break; > + case VA_CLIENT_JOB_STATE_DONE: > + TRACE("marker"); > + QTAILQ_REMOVE(&m->client_jobs, cj, next); > + break; > + case VA_CLIENT_JOB_STATE_BUSY: > + TRACE("marker, client job currently busy"); > + break; > + default: > + LOG("error, unknown client job state"); > + break; > + } > +} > + > +void va_kick(VAManager *m) > +{ > + VAServerJob *sj, *sj_tmp; > + VAClientJob *cj, *cj_tmp; > + > + TRACE("called"); > + TRACE("send_count: %u, execute_count: %u", m->send_count, > m->execute_count); > + > + /* TODO: make sure there is no starvation of jobs/operations here */ > + > + /* look for any work to be done among pending server jobs */ > + QTAILQ_FOREACH_SAFE(sj, &m->server_jobs, next, sj_tmp) { > + TRACE("marker, server tag: %s", sj->tag); > + va_process_server_job(m, sj); > + } > + > + /* look for work to be done among pending client jobs */ > + QTAILQ_FOREACH_SAFE(cj, &m->client_jobs, next, cj_tmp) { > + TRACE("marker, client tag: %s", cj->tag); > + va_process_client_job(m, cj); > + } > +} > diff --git a/virtagent-manager.h b/virtagent-manager.h > new file mode 100644 > index 0000000..7b463fb > --- /dev/null > +++ b/virtagent-manager.h > @@ -0,0 +1,130 @@ > +#ifndef VIRTAGENT_MANAGER_H > +#define VIRTAGENT_MANAGER_H > + > +#include "qemu-common.h" > +#include "qemu-queue.h" > + > +/* > + * Protocol Overview: > + * > + * The virtagent protocol depends on a state machine to manage communication > + * over a single connection stream, currently a virtio or isa serial channel. > + * The basic characterization of the work being done is that clients > + * send/handle client jobs locally, which are then read/handled remotely as > + * server jobs. A client job consists of a request which is sent, and a > + * response which is eventually recieved. A server job consists of a request > + * which is recieved from the other end, and a response which is sent back. "i before e, except after c ..." (I misspell receive all the time too). > + * > + * Server jobs are given priority over client jobs, i.e. if we send a client > + * job (our request) and recieve a server job (their request), rather than > + * await a response to the client job, we immediately begin processing the > + * server job and then send back the response. This prevents us from being > + * deadlocked in a situation where both sides have sent a client job and are > + * awaiting the response before handling the other side's client job. > + * > + * Multiple in-flight requests are supported, but high request rates can > + * potentially starve out the other side's client jobs / requests, so we'll > + * behaved participants should periodically backoff on high request rates, or > + * limit themselves to 1 request at a time (anything more than 1 can still > + * potentionally remove any window for the other end to service it's own > + * client jobs, since we can begin sending the next request before it begins > + * send the response for the 2nd). > + * > + * On a related note, in the future, bidirectional user/session-level guest > + * agents may also be supported via a forwarding service made available > + * through the system-level guest agent. In this case it is up to the > + * system-level agent to handle forwarding requests in such a way that we > + * don't starve the host-side service out sheerly by having too many > + * sessions/users trying to send RPCs at a constant rate. This would be > + * supported through this job Manager via an additional "forwarder" job type. > + * > + * To encapsulate some of this logic, we define here a "Manager" class, which > + * provides an abstract interface to a state machine which handles most of > + * the above logic transparently to the transport/application-level code. > + * This also makes it possible to utilize alternative > + * transport/application-level protocols in the future. > + * > + */ > + > +/* > + * Two types of jobs are generated from various components of virtagent. > + * Each job type has a priority, and a set of prioritized functions as well. > + * > + * The read handler generates new server jobs as it recieves requests from > + * the channel. Server jobs make progress through the following operations. > + * > + * EXECUTE->EXECUTE_DONE->SEND->SEND_DONE > + * > + * EXECUTE (provided by user, manager calls) > + * When server jobs are added, eventually (as execution slots become > + * available) an execute() will be called to begin executing the job. An > + * error value will be returned if there is no room in the queue for another > + * server job. > + * > + * EXECUTE_DONE (provided by manager, user calls) > + * As server jobs complete, execute_completed() is called to update execution > + * status of that job (failure/success), inject the payload, and kick off the > + * next operation. > + * > + * SEND (provided by user, manager calls) > + * Eventually the send() operation is made. This will cause the send handler > + * to begin sending the response. > + * > + * SEND_DONE (provided by manager, user calls) > + * Upon completion of that send, the send_completed() operation will be > + * called. This will free up the job, and kick off the next operation. > + */ Very helpful protocol overview. Thanks for adding this. > +typedef int (va_job_op)(void *opaque, const char *tag); > +typedef struct VAServerJobOps { > + va_job_op *execute; > + va_job_op *send; > + va_job_op *callback; > +} VAServerJobOps; > + > +/* > + * The client component generates new client jobs as they're made by > + * virtagent in response to monitored events or user-issued commands. > + * Client jobs progress via the following operations. > + * > + * SEND->SEND_DONE->READ_DONE > + * > + * SEND (provided by user, called by manager) > + * After client jobs are added, send() will eventually be called to queue > + * the job up for xmit over the channel. > + * > + * SEND_DONE (provided by manager, called by user) > + * Upon completion of the send, send_completed() should be called with > + * failure/success indication. > + * > + * READ_DONE (provided by manager, called by user) > + * When a response for the request is read back via the transport layer, > + * read_done() will be called by the user to indicate success/failure, > + * inject the response, and make the associated callback. > + */ > +typedef int (va_client_job_cb)(void *opaque, void *resp_opaque, > + const char *tag); > +typedef struct VAClientJobOps { > + va_job_op *send; > + va_client_job_cb *callback; > +} VAClientJobOps; > + > +typedef struct VAManager VAManager; > + > +VAManager *va_manager_new(void); > +void va_kick(VAManager *m); > + > +/* interfaces for server jobs */ > +int va_server_job_add(VAManager *m, const char *tag, void *opaque, > + VAServerJobOps ops); > +void va_server_job_execute_done(VAManager *m, const char *tag); > +void va_server_job_send_done(VAManager *m, const char *tag); > +void va_server_job_cancel(VAManager *m, const char *tag); > + > +/* interfaces for client jobs */ > +int va_client_job_add(VAManager *m, const char *tag, void *opaque, > + VAClientJobOps ops); > +void va_client_job_cancel(VAManager *m, const char *tag); > +void va_client_job_send_done(VAManager *m, const char *tag); > +void va_client_job_read_done(VAManager *m, const char *tag, void *resp); > + > +#endif /* VIRTAGENT_MANAGER_H */ -- Thanks, Adam