Author: Armin Rigo <[email protected]>
Branch: 
Changeset: r306:6b07c127c560
Date: 2013-06-27 17:01 +0200
http://bitbucket.org/pypy/stmgc/changeset/6b07c127c560/

Log:    Implement transactions (incomplete so far)

diff --git a/duhton/consobject.c b/duhton/consobject.c
--- a/duhton/consobject.c
+++ b/duhton/consobject.c
@@ -1,10 +1,5 @@
 #include "duhton.h"
 
-typedef struct {
-    DuOBJECT_HEAD
-    DuObject *car, *cdr;
-} DuConsObject;
-
 
 void cons_trace(DuConsObject *ob, void visit(gcptr *))
 {
diff --git a/duhton/duhton.h b/duhton/duhton.h
--- a/duhton/duhton.h
+++ b/duhton/duhton.h
@@ -108,6 +108,11 @@
 DuObject *DuSymbol_FromString(const char *name);
 char *DuSymbol_AsString(DuObject *ob);
 
+typedef struct {
+    DuOBJECT_HEAD
+    DuObject *car, *cdr;
+} DuConsObject;
+
 DuObject *DuCons_New(DuObject *car, DuObject *cdr);
 DuObject *DuCons_Car(DuObject *cons);
 DuObject *DuCons_Cdr(DuObject *cons);
diff --git a/duhton/glob.c b/duhton/glob.c
--- a/duhton/glob.c
+++ b/duhton/glob.c
@@ -503,16 +503,14 @@
 
 DuObject *du_transaction(DuObject *cons, DuObject *locals)
 {
-    Du_FatalError("transaction: not implemented");
-#if 0
     if (cons == Du_None)
         Du_FatalError("transaction: expected at least one argument");
+
+    _du_read1(cons);
     DuObject *sym = _DuCons_CAR(cons);
     DuObject *rest = _DuCons_NEXT(cons);
     _DuFrame_EvalCall(locals, sym, rest, 0);
-    Du_INCREF(Du_None);
     return Du_None;
-#endif
 }
 
 DuObject *du_sleepms(DuObject *cons, DuObject *locals)
diff --git a/duhton/transaction.c b/duhton/transaction.c
--- a/duhton/transaction.c
+++ b/duhton/transaction.c
@@ -1,12 +1,140 @@
 #include "duhton.h"
+#include <pthread.h>
+#include <unistd.h>
 
+#define NUM_THREADS  4
+
+
+static DuConsObject du_pending_transactions = {
+    DuOBJECT_HEAD_INIT(DUTYPE_CONS),
+    NULL,
+    Du_None,
+};
+
+static pthread_mutex_t mutex_sleep = PTHREAD_MUTEX_INITIALIZER;
+static int thread_sleeping = 0;
+
+static void *run_thread(void *);   /* forward */
+
+static void run_all_threads(void)
+{
+    int i;
+    pthread_t th[NUM_THREADS];
+
+    for (i = 0; i < NUM_THREADS; i++) {
+        int status = pthread_create(&th[i], NULL, run_thread, NULL);
+        if (status != 0)
+            stm_fatalerror("status != 0\n");
+    }
+    for (i = 0; i < NUM_THREADS; i++) {
+        pthread_join(th[i], NULL);
+    }
+}
+
+/************************************************************/
 
 void Du_TransactionAdd(DuObject *code, DuObject *frame)
 {
-    /* XXX */
+    DuObject *cell = DuCons_New(code, frame);
+    DuObject *pending = (DuObject *)stm_thread_local_obj;
+
+    if (pending == NULL) {
+        pending = Du_None;
+    }
+    pending = DuCons_New(cell, pending);
+    stm_thread_local_obj = (gcptr)pending;
 }
 
 void Du_TransactionRun(void)
 {
-    /* XXX */
+    if (stm_thread_local_obj == NULL)
+        return;
+
+    DuConsObject *root = &du_pending_transactions;
+    _du_write1(root);
+    root->cdr = stm_thread_local_obj;
+
+    stm_commit_transaction();
+    run_all_threads();
+    stm_begin_inevitable_transaction();
 }
+
+/************************************************************/
+
+static DuObject *next_cell(void)
+{
+    DuObject *pending = (DuObject *)stm_thread_local_obj;
+
+    if (pending == NULL) {
+        /* fish from the global list of pending transactions */
+        DuConsObject *root;
+
+      restart:
+        root = &du_pending_transactions;
+        _du_read1(root);
+
+        if (root->cdr != Du_None) {
+            DuObject *cell = root->cdr;
+            _du_write1(root);
+
+            _du_read1(cell);
+            DuObject *result = _DuCons_CAR(cell);
+            root->cdr = _DuCons_NEXT(cell);
+
+            return result;
+        }
+        else {
+            /* nothing to do, wait */
+            thread_sleeping++;
+            if (thread_sleeping == NUM_THREADS) {
+                pthread_mutex_unlock(&mutex_sleep);
+            }
+            stm_commit_transaction();
+            pthread_mutex_lock(&mutex_sleep);
+            stm_begin_inevitable_transaction();
+            if (thread_sleeping == NUM_THREADS) {
+                pthread_mutex_unlock(&mutex_sleep);
+                return NULL;
+            }
+            thread_sleeping--;
+            goto restart;
+        }
+    }
+
+    /* we have at least one thread-local transaction pending */
+    _du_read1(pending);
+    DuObject *result = _DuCons_CAR(pending);
+    DuObject *next = _DuCons_NEXT(pending);
+
+    if (next != Du_None) {
+        /* we have more than one: add the others to the global list */
+        assert(!"XXX");
+        abort();
+    }
+
+    return result;
+}
+
+int run_transaction(gcptr cell, int retry_counter)
+{
+    DuObject *code  = DuCons_Car(cell);
+    DuObject *frame = DuCons_Cdr(cell);
+    Du_Progn(code, frame);
+    return 0;
+}
+
+void *run_thread(void *ignored)
+{
+    stm_initialize();
+
+    while (1) {
+        /* we are inevitable here */
+        DuObject *cell = next_cell();
+        if (cell == NULL)
+            break;
+        stm_perform_transaction(cell, run_transaction);
+    }
+
+    stm_finalize();
+    return NULL;
+}
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to