Author: ArcRiley Date: 2009-01-09 05:19:52 -0500 (Fri, 09 Jan 2009) New Revision: 1451
Modified: trunk/concordance/src/Core.c trunk/concordance/src/Core.h Log: callback can now respond Modified: trunk/concordance/src/Core.c =================================================================== --- trunk/concordance/src/Core.c 2009-01-09 09:33:57 UTC (rev 1450) +++ trunk/concordance/src/Core.c 2009-01-09 10:19:52 UTC (rev 1451) @@ -25,6 +25,7 @@ static gpointer conCore_loop (gpointer); static int conCore_listenChannel (conCoreObject*, conChannel*); static gboolean conCore_listenNew (GIOChannel*, GIOCondition, gpointer); +void conCore_queuePullNew (conCoreObject* self); static gboolean conCore_sessionRead (GIOChannel*, GIOCondition, gpointer); static gboolean conCore_sessionWrite (GIOChannel*, GIOCondition, gpointer); static void conCore_xmlStart (gpointer, const XML_Char*, @@ -56,6 +57,7 @@ */ self->queueRecv = g_async_queue_new(); self->queueSend = g_async_queue_new(); + } return (PyObject*) self; } @@ -148,6 +150,9 @@ */ self->context = g_main_context_new(); + /* temporary */ + conCore_queuePullNew(self); + /* open the listening channels Note that the sockets default to -1 and remain -1 on failure, the @@ -249,7 +254,7 @@ PyObject* sinput; PyObject* tinput; PyObject* output; - GString* popped; + conCoreQueueMsg* popped; GTimeVal popEnd; /* ensure there are no arguments save self */ @@ -259,17 +264,17 @@ /* loop until a signal is raised int PyErr_CheckSignals (); - Returns 0 or -1 if there's a signal + Returns 0 normally, or -1 if a signal has been raised. */ while (PyErr_CheckSignals() == 0) { - /* calculate end time to wait for next callback + /* calculate end time to wait for next callback, 0.5 seconds ahead void g_get_current_time (GTimeVal *result); void g_time_val_add (GTimeVal *time_, glong microseconds); */ g_get_current_time(&popEnd); - g_time_val_add(&popEnd, 10000); + g_time_val_add(&popEnd, 50000); /* release the GIL while waiting for the next callback @@ -290,7 +295,8 @@ PyObject* PyTuple_Pack (Py_ssize_t n, ...) */ - sinput = PyUnicode_FromStringAndSize(popped->str, popped->len); + sinput = PyUnicode_FromStringAndSize(popped->message->str, + popped->message->len); tinput = PyTuple_Pack(1, sinput); /* call our handler @@ -303,19 +309,25 @@ handle = PyObject_GetAttrString(s, "clientHandle"); output = PyEval_CallObject(handle, tinput); - /* do nothing with the output, yet. we need a GMainLoop event for it */ + /* reset popped->message to reuse with response, convert utf16 to utf8 - /* free the objects we created and the GString* we popped + GString* g_string_assign (GString *string, + const gchar *rval); + void g_async_queue_push (GAsyncQueue *queue, + gpointer data); + */ + popped->message = g_string_assign(popped->message, + conPyUnicodeToUTF8(output)); + g_async_queue_push(self->queueSend, popped); + /* free the objects we're done with + void Py_DECREF (PyObject *o); - gchar* g_string_free (GString *string, - gboolean free_segment); */ Py_DECREF(output); Py_DECREF(handle); Py_DECREF(tinput); Py_DECREF(sinput); - g_string_free(popped, TRUE); } /* signal was raised, return with error condition */ @@ -516,6 +528,126 @@ # ########################################################################### # + # Callback Queue Handling + # */ + static void + conCore_queuePush(conSession* session) { /*\ + cdef : \*/ + conCoreObject* self = session->core; + conCoreQueueMsg* push; + + /* alloc push */ + push = g_malloc(sizeof(conCoreQueueMsg)); + + + /* we're pushing our element buffer onto the queue, so we need a new one + + GString* g_string_new (const gchar *init); + */ + push->message = session->ebuff; + session->ebuff = g_string_new(""); + push->session = session; + + /* push message onto the callback queue + + void g_async_queue_push (GAsyncQueue *queue, + gpointer data); + */ + g_async_queue_push(self->queueRecv, push); + } + + + gboolean + conCore_queuePullPrepare(GSource* source, gint* timeout) { /*\ + cdef : \*/ + conCoreObject* self = ((conCoreQueueSource*) source)->self; + + /* poll timeout 1/25th of a second */ + *timeout = 40; + + /* return TRUE if there is data queued + + gint g_async_queue_length (GAsyncQueue *queue); + */ + return (g_async_queue_length(self->queueSend) > 0); + } + + gboolean + conCore_queuePullCheck(GSource* source) { /*\ + cdef : \*/ + conCoreObject* self = ((conCoreQueueSource*) source)->self; + + /* return TRUE if there is data queued + + gint g_async_queue_length (GAsyncQueue *queue); + */ + return (g_async_queue_length(self->queueSend) > 0); + } + + gboolean + conCore_queuePullDispatch(GSource* source, GSourceFunc callback, + gpointer user_data) { /*\ + cdef : \*/ + conCoreObject* self = ((conCoreQueueSource*) source)->self; + conCoreQueueMsg* pop; + + /* get the next callback reply and send it + + gpointer g_async_queue_pop (GAsyncQueue *queue); + */ + pop = g_async_queue_pop(self->queueSend); + printf("}%s", pop->message->str); + conCore_sessionSend(pop->session, pop->message->str, pop->message->len); + + /* free popped message before returning + + gchar* g_string_free (GString *string, + gboolean free_segment); + void g_free (gpointer mem); + */ + g_string_free(pop->message, TRUE); + g_free(pop); + + return TRUE; + } + + void + conCore_queuePullNew(conCoreObject* self) { /*\ + cdef : \*/ + conCoreQueueSource* source; + + /* populate srcfuncs + + typedef struct { + gboolean (*prepare) (GSource *source, + gint *timeout_); + gboolean (*check) (GSource *source); + gboolean (*dispatch) (GSource *source, + GSourceFunc callback, + gpointer user_data); + void (*finalize) (GSource *source); + } GSourceFuncs; + */ + self->queueFuncs.prepare = conCore_queuePullPrepare; + self->queueFuncs.check = conCore_queuePullCheck; + self->queueFuncs.dispatch = conCore_queuePullDispatch; + self->queueFuncs.finalize = NULL; + + /* return the new source + + GSource * g_source_new (GSourceFuncs *source_funcs, + guint struct_size); + */ + source = (conCoreQueueSource*) g_source_new(&self->queueFuncs, + sizeof(conCoreQueueSource)); + source->self = self; + g_source_attach((GSource*) source, self->context); + } + + /* + # + ########################################################################### + # # Channel Callbacks # */ @@ -874,31 +1006,6 @@ # ########################################################################### # - # Python Handles - # */ - static void - conCore_handle_client(conSession* session) { /*\ - cdef : \*/ - conCoreObject* self = session->core; - - /* push element buffer to callback queue - - void g_async_queue_push (GAsyncQueue *queue, - gpointer data); - */ - g_async_queue_push(self->queueRecv, session->ebuff); - - /* we've sent our element buffer into the queue, we need a new one - - GString* g_string_new (const gchar *init); - */ - session->ebuff = g_string_new(""); - } - - /* - # - ########################################################################### - # # XML Callbacks # */ static void @@ -943,7 +1050,10 @@ */ if (g_ascii_strcasecmp(name, "http://etherx.jabber.org/streams stream") == 0) - /* need to test version and other important attributes */ + /* need to test version and other important attributes + + + */ conCore_reply_stream(session); else session->state = CON_E_CLOSE; @@ -1092,7 +1202,7 @@ ...); */ g_string_append_printf(session->ebuff, "</%s>", element[1]); - conCore_handle_client(session); + conCore_queuePush(session); break; } case CON_E_SASL : { @@ -1222,6 +1332,3 @@ 0, /*tp_free*/ 0, /*tp_is_gc*/ }; - /* - # - #########################################################################*/ Modified: trunk/concordance/src/Core.h =================================================================== --- trunk/concordance/src/Core.h 2009-01-09 09:33:57 UTC (rev 1450) +++ trunk/concordance/src/Core.h 2009-01-09 10:19:52 UTC (rev 1451) @@ -57,6 +57,7 @@ Gsasl* saslCntx; /* sasl context for this Core */ GAsyncQueue* queueRecv; /* callback queue input */ GAsyncQueue* queueSend; /* callback queue output */ + GSourceFuncs queueFuncs; /* source functions */ } conCoreObject; @@ -84,6 +85,19 @@ GString* eto; /* JID of destination */ } conSession; + +typedef struct { + GString* message; + conSession* session; +} conCoreQueueMsg; + + +typedef struct { + GSource base; + conCoreObject* self; +} conCoreQueueSource; + + PyTypeObject conCore_Type; #define conCoreObject_Check(v) (Py_TYPE(v) == &conCore_Type) _______________________________________________ PySoy-SVN mailing list PySoy-SVN@pysoy.org http://www.pysoy.org/mailman/listinfo/pysoy-svn