Author: ArcRiley
Date: 2009-01-09 05:19:52 -0500 (Fri, 09 Jan 2009)
New Revision: 1451

Modified:
   trunk/concordance/src/Core.c
   trunk/concordance/src/Core.h
Log:
callback can now respond


Modified: trunk/concordance/src/Core.c
===================================================================
--- trunk/concordance/src/Core.c        2009-01-09 09:33:57 UTC (rev 1450)
+++ trunk/concordance/src/Core.c        2009-01-09 10:19:52 UTC (rev 1451)
@@ -25,6 +25,7 @@
 static gpointer   conCore_loop          (gpointer);
 static int        conCore_listenChannel (conCoreObject*, conChannel*);
 static gboolean   conCore_listenNew     (GIOChannel*, GIOCondition, gpointer);
+void              conCore_queuePullNew  (conCoreObject* self);
 static gboolean   conCore_sessionRead   (GIOChannel*, GIOCondition, gpointer);
 static gboolean   conCore_sessionWrite  (GIOChannel*, GIOCondition, gpointer);
 static void       conCore_xmlStart      (gpointer, const XML_Char*,
@@ -56,6 +57,7 @@
       */
       self->queueRecv = g_async_queue_new();
       self->queueSend = g_async_queue_new();
+
     }
     return (PyObject*) self;
   }
@@ -148,6 +150,9 @@
     */
     self->context = g_main_context_new();
 
+    /* temporary */
+    conCore_queuePullNew(self);
+
     /* open the listening channels
 
        Note that the sockets default to -1 and remain -1 on failure, the
@@ -249,7 +254,7 @@
       PyObject*         sinput;
       PyObject*         tinput;
       PyObject*         output;
-      GString*          popped;
+      conCoreQueueMsg*  popped;
       GTimeVal          popEnd;
 
     /* ensure there are no arguments save self */
@@ -259,17 +264,17 @@
     /* loop until a signal is raised
 
        int              PyErr_CheckSignals       ();
-       Returns 0 or -1 if there's a signal
+       Returns 0 normally, or -1 if a signal has been raised.
     */
     while (PyErr_CheckSignals() == 0) {
-      /* calculate end time to wait for next callback
+      /* calculate end time to wait for next callback, 0.5 seconds ahead
 
          void           g_get_current_time       (GTimeVal *result);
          void           g_time_val_add           (GTimeVal *time_,
                                                   glong microseconds);
       */
       g_get_current_time(&popEnd);
-      g_time_val_add(&popEnd, 10000);
+      g_time_val_add(&popEnd, 50000);
 
       /* release the GIL while waiting for the next callback
 
@@ -290,7 +295,8 @@
          PyObject*      PyTuple_Pack                  (Py_ssize_t n,
                                                        ...)
       */
-      sinput = PyUnicode_FromStringAndSize(popped->str, popped->len);
+      sinput = PyUnicode_FromStringAndSize(popped->message->str,
+                                           popped->message->len);
       tinput = PyTuple_Pack(1, sinput);
 
       /* call our handler
@@ -303,19 +309,25 @@
       handle = PyObject_GetAttrString(s, "clientHandle");
       output = PyEval_CallObject(handle, tinput);
 
-      /* do nothing with the output, yet. we need a GMainLoop event for it */
+      /* reset popped->message to reuse with response, convert utf16 to utf8
 
-      /* free the objects we created and the GString* we popped
+         GString*       g_string_assign          (GString *string,
+                                                  const gchar *rval);
+         void           g_async_queue_push       (GAsyncQueue *queue,
+                                                  gpointer data);
+      */
+      popped->message = g_string_assign(popped->message,
+                                        conPyUnicodeToUTF8(output));
+      g_async_queue_push(self->queueSend, popped);
 
+      /* free the objects we're done with
+
          void           Py_DECREF                (PyObject *o);
-         gchar*         g_string_free            (GString *string,
-                                                  gboolean free_segment);
       */
       Py_DECREF(output);
       Py_DECREF(handle);
       Py_DECREF(tinput);
       Py_DECREF(sinput);
-      g_string_free(popped, TRUE);
     }
 
     /* signal was raised, return with error condition */
@@ -516,6 +528,126 @@
   #
   ###########################################################################
   #
+  # Callback Queue Handling
+  #                                                                        */
+  static void
+  conCore_queuePush(conSession* session) {                                /*\
+    cdef :                                                                \*/
+      conCoreObject*        self = session->core;
+      conCoreQueueMsg*      push;
+
+    /* alloc push */
+    push = g_malloc(sizeof(conCoreQueueMsg));
+
+
+    /* we're pushing our element buffer onto the queue, so we need a new one
+
+       GString*         g_string_new             (const gchar *init);
+    */
+    push->message = session->ebuff;
+    session->ebuff = g_string_new("");
+    push->session = session;
+
+    /* push message onto the callback queue
+
+       void             g_async_queue_push       (GAsyncQueue *queue,
+                                                  gpointer data);
+    */
+    g_async_queue_push(self->queueRecv, push);
+  }
+
+
+  gboolean
+  conCore_queuePullPrepare(GSource* source, gint* timeout) {              /*\
+    cdef :                                                                \*/
+      conCoreObject*    self = ((conCoreQueueSource*) source)->self;
+
+    /* poll timeout 1/25th of a second */
+    *timeout = 40;
+
+    /* return TRUE if there is data queued
+
+       gint             g_async_queue_length     (GAsyncQueue *queue);
+    */
+    return (g_async_queue_length(self->queueSend) > 0);
+  }
+
+  gboolean
+  conCore_queuePullCheck(GSource* source) {                               /*\
+    cdef :                                                                \*/
+      conCoreObject*    self = ((conCoreQueueSource*) source)->self;
+
+    /* return TRUE if there is data queued
+
+       gint             g_async_queue_length     (GAsyncQueue *queue);
+    */
+    return (g_async_queue_length(self->queueSend) > 0);
+  }
+
+  gboolean
+  conCore_queuePullDispatch(GSource* source, GSourceFunc callback,
+                            gpointer user_data) {                         /*\
+    cdef :                                                                \*/
+      conCoreObject*    self = ((conCoreQueueSource*) source)->self;
+      conCoreQueueMsg*  pop;
+
+    /* get the next callback reply and send it
+
+       gpointer         g_async_queue_pop  (GAsyncQueue *queue);
+    */      
+    pop = g_async_queue_pop(self->queueSend);
+    printf("}%s", pop->message->str);
+    conCore_sessionSend(pop->session, pop->message->str, pop->message->len);
+
+    /* free popped message before returning
+
+       gchar*           g_string_free            (GString *string,
+                                                  gboolean free_segment);
+       void             g_free                   (gpointer mem);
+    */
+    g_string_free(pop->message, TRUE);
+    g_free(pop);
+
+    return TRUE;
+  }
+
+  void
+  conCore_queuePullNew(conCoreObject* self) {                             /*\
+    cdef :                                                                \*/
+      conCoreQueueSource* source;
+
+    /* populate srcfuncs
+
+       typedef struct {
+         gboolean (*prepare)  (GSource    *source,
+                               gint       *timeout_);
+         gboolean (*check)    (GSource    *source);
+         gboolean (*dispatch) (GSource    *source,
+                               GSourceFunc callback,
+                               gpointer    user_data);
+         void     (*finalize) (GSource    *source);
+       } GSourceFuncs;
+    */
+    self->queueFuncs.prepare  = conCore_queuePullPrepare;
+    self->queueFuncs.check    = conCore_queuePullCheck;
+    self->queueFuncs.dispatch = conCore_queuePullDispatch;
+    self->queueFuncs.finalize = NULL;
+
+    /* return the new source
+
+       GSource *        g_source_new             (GSourceFuncs *source_funcs,
+                                                  guint struct_size);
+    */
+    source = (conCoreQueueSource*) g_source_new(&self->queueFuncs, 
+                                                sizeof(conCoreQueueSource));
+    source->self = self;
+    g_source_attach((GSource*) source, self->context);
+  }
+  
+  /*
+  #
+  ###########################################################################
+  #
   # Channel Callbacks
   #                                                                        */
 
@@ -874,31 +1006,6 @@
   #
   ###########################################################################
   #
-  # Python Handles
-  #                                                                        */
-  static void
-  conCore_handle_client(conSession* session) {                            /*\
-    cdef :                                                                \*/
-      conCoreObject*        self = session->core;
-
-    /* push element buffer to callback queue
-
-       void             g_async_queue_push       (GAsyncQueue *queue,
-                                                  gpointer data);
-    */
-    g_async_queue_push(self->queueRecv, session->ebuff);
-
-    /* we've sent our element buffer into the queue, we need a new one
-
-       GString*         g_string_new             (const gchar *init);
-    */
-    session->ebuff = g_string_new("");
-  }
-
-  /*
-  #
-  ###########################################################################
-  #
   # XML Callbacks
   #                                                                        */
   static void
@@ -943,7 +1050,10 @@
         */
         if (g_ascii_strcasecmp(name,
                                "http://etherx.jabber.org/streams stream") == 0)
-          /* need to test version and other important attributes */
+          /* need to test version and other important attributes
+
+
+          */
           conCore_reply_stream(session);
         else
           session->state = CON_E_CLOSE;
@@ -1092,7 +1202,7 @@
                                                   ...);
             */
             g_string_append_printf(session->ebuff, "</%s>", element[1]);
-            conCore_handle_client(session);
+            conCore_queuePush(session);
             break;
           }
           case CON_E_SASL : {
@@ -1222,6 +1332,3 @@
     0,                                 /*tp_free*/
     0,                                 /*tp_is_gc*/
   };
-  /*
-  #
-  #########################################################################*/

Modified: trunk/concordance/src/Core.h
===================================================================
--- trunk/concordance/src/Core.h        2009-01-09 09:33:57 UTC (rev 1450)
+++ trunk/concordance/src/Core.h        2009-01-09 10:19:52 UTC (rev 1451)
@@ -57,6 +57,7 @@
   Gsasl*           saslCntx;           /* sasl context for this Core */
   GAsyncQueue*     queueRecv;          /* callback queue input */
   GAsyncQueue*     queueSend;          /* callback queue output */
+  GSourceFuncs     queueFuncs;         /* source functions */
 } conCoreObject;
 
 
@@ -84,6 +85,19 @@
   GString*         eto;                /* JID of destination */
 } conSession;
 
+
+typedef struct {
+  GString*         message;
+  conSession*      session;
+} conCoreQueueMsg;
+
+
+typedef struct {
+  GSource          base;
+  conCoreObject*   self;
+} conCoreQueueSource;
+
+
 PyTypeObject conCore_Type;
 
 #define conCoreObject_Check(v)  (Py_TYPE(v) == &conCore_Type)

_______________________________________________
PySoy-SVN mailing list
PySoy-SVN@pysoy.org
http://www.pysoy.org/mailman/listinfo/pysoy-svn

Reply via email to