Dan Source code is attached. I didn't write this, someone else from the forum did their name is not on it, nor coppyrighted.. I thought it was a clean way to test threading.
Interestingly if you remove out the shared cache everything runs to completion. Dan Kennedy <[EMAIL PROTECTED]> wrote: Hi Ken, Probably a bug in the new threading stuff. Can you share source code for this test or is it part of some large app? Either way, thanks for the report. Dan. On Wed, 2007-08-29 at 22:15 -0700, Ken wrote: > Also erros out here, sporadically. > int sqlite3OsWrite(sqlite3_file *id, const void *pBuf, int amt, i64 offset){ > return id->pMethods->xWrite(id, pBuf, amt, offset); > } > > Program received signal SIGSEGV, Segmentation fault. > [Switching to Thread 1075841376 (LWP 15747)] > 0x000000000040c413 in sqlite3OsWrite (id=0x55aaa0, pBuf=0x401ffc30, amt=24, > offset=0) at os.c:38 > (gdb) Quit > (gdb) > > Ken wrote: 4 threads, shared_Cache enabled > LOOP 100 > BEGIN > LOOP 50 times > INSERT > end LOOP > COMMIT > > SELECT COUNT(*) ... > end LOOP > > > program received signal SIGSEGV, Segmentation fault. > [Switching to Thread 1080043872 (LWP 15448)] > moveToChild (pCur=0x569058, newPgno=) at btree.c:3304 > (gdb) > > > if( rc ) return rc; > pNewPage->idxParent = pCur->idx; > pOldPage = pCur->pPage; > pOldPage->idxShift = 0; <---------------- Error Here > releasePage(pOldPage); > pCur->pPage = pNewPage; > pCur->idx = 0; > pCur->info.nSize = 0; > > > Ken > > > > > Ken wrote: 4 threads, shared_Cache enabled > LOOP 100 > BEGIN > LOOP 50 times > INSERT > end LOOP > COMMIT > > SELECT COUNT(*) ... > end LOOP > > > program received signal SIGSEGV, Segmentation fault. > [Switching to Thread 1080043872 (LWP 15448)] > moveToChild (pCur=0x569058, newPgno=) at btree.c:3304 > (gdb) > > > if( rc ) return rc; > pNewPage->idxParent = pCur->idx; > pOldPage = pCur->pPage; > pOldPage->idxShift = 0; <---------------- Error Here > releasePage(pOldPage); > pCur->pPage = pNewPage; > pCur->idx = 0; > pCur->info.nSize = 0; > > > Ken > > ----------------------------------------------------------------------------- To unsubscribe, send email to [EMAIL PROTECTED] -----------------------------------------------------------------------------
#include <stdio.h> #include <sqlite3.h> #include <stdlib.h> #include <string.h> #include <sys/stat.h> #include <pthread.h> #ifdef __Linux__ # define _XOPEN_SOURCE 500 #endif #include <unistd.h> #define SQLITEDB "test.db" #define NUM_THREADS 4 #define MAX_BUSY 25 #define NUM_LOOPS 200 #define NUM_INSERTS 10 #define SHARED_CACHE 1 #ifndef RETRY_BUSY #define RETRY_BUSY 1 #endif static void sync_thread(int thnum); typedef struct sql_result { int alloc_rows; int rows; int cols; char ***data; } sql_result; typedef struct th_st { int th_num; /* 0 = not started, 1 = started, 2 = done, 3 = error */ int state; } th_st; th_st *th_data = NULL; pthread_mutex_t global_mutex; int on_thread = 0; #define SQLITE_ALLOC_ROWS 1024 static int sqlite3_rowcallback(void *myresult, int argc, char **argv, char **colName) { sql_result *result = myresult; int i; (void)colName; if (result == NULL) return (SQLITE_ERROR); if (result->cols == 0) result->cols=argc; else if (result->cols != argc) return (SQLITE_ERROR); if (result->rows >= result->alloc_rows) { result->data=(char ***)realloc(result->data, (result->alloc_rows+SQLITE_ALLOC_ROWS)*sizeof(char **)); result->alloc_rows += SQLITE_ALLOC_ROWS; } result->data[result->rows] = (char **)malloc((result->cols)*sizeof(char *)); for (i=0; i<result->cols; i++) { if (argv[i] == NULL) result->data[result->rows][i] = NULL; else result->data[result->rows][i] = strdup(argv[i]); } result->rows++; return(SQLITE_OK); } static void sqlite3_printresult(sql_result *result) { int i, j; if (!result->rows) return; printf("Query Results:\n"); for (i=0; i<result->rows; i++) { for (j=0; j<result->cols; j++) { if (j != 0) printf(","); printf("%s", result->data[i][j]); } printf("\n"); } printf("\n"); } static void sqlite3_freeresult(sql_result *result) { int i, j; if (!result->rows) return; for (i=0; i<result->rows; i++) { for (j=0; j<result->cols; j++) { free(result->data[i][j]); } free(result->data[i]); } free(result->data); } /** sqlite3_execute * Description: Execute SQL statment, loop on busy * Return : 1 on success, 0 on failure, -1 on rollback/concurrency issue */ static int sqlite3_execute(sqlite3 *db_conn, const char *statement, int thnum) { int rc, retval; sql_result result; char *errmsg = NULL; #if RETRY_BUSY int busy_cnt = 0; #endif memset(&result, 0, sizeof(result)); do { errmsg = NULL; printf("%d => Executing: %s\n", thnum, statement); rc = sqlite3_exec(db_conn, statement, sqlite3_rowcallback, &result, &errmsg); switch (rc) { case SQLITE_OK: retval = 1; break; case SQLITE_BUSY: #if RETRY_BUSY busy_cnt++; break; #endif case SQLITE_LOCKED: /* Issue a rollback if not in autocommit mode */ if (!sqlite3_get_autocommit(db_conn)) { if (sqlite3_execute(db_conn, "ROLLBACK", thnum) != 1) { printf("%d => ROLLBACK failed\n", thnum); } else { printf("%d => ROLLBACK succeeded\n", thnum); } } retval = -1; break; default: printf("%d => query failed: %s\n", thnum, errmsg); retval = 0; break; } if (errmsg != NULL) { sqlite3_free(errmsg); errmsg = NULL; } #if RETRY_BUSY /* Don't loop too fast */ if (rc == SQLITE_BUSY) { if (busy_cnt > MAX_BUSY) { rc = SQLITE_ERROR; retval = 0; printf("%d => MAX BUSY CNT\n", thnum); } else { printf("%d => BUSY\n", thnum); usleep(40000); } } } while (rc == SQLITE_BUSY); #else } while (0); #endif sqlite3_printresult(&result); sqlite3_freeresult(&result); return(retval); } static void sync_thread(int thnum) { /* Unassign from self, and assign to next thread */ pthread_mutex_lock(&global_mutex); if (on_thread == thnum) on_thread++; pthread_mutex_unlock(&global_mutex); while(1) { pthread_mutex_lock(&global_mutex); /* Thread could disappear */ if (on_thread < NUM_THREADS && th_data[on_thread].state >= 2) on_thread++; if (on_thread >= NUM_THREADS) on_thread = 0; if (on_thread == thnum) { pthread_mutex_unlock(&global_mutex); break; } pthread_mutex_unlock(&global_mutex); usleep(20000); } } void *testthread(void *arg) { int rc; int rollback = 0; sqlite3 *db_conn = NULL; th_st *th = arg; char temp[255]; int i, j; rc = sqlite3_open(SQLITEDB , &db_conn); if (rc) { printf("%d => sqlite3_open() failed: %s\n", th->th_num, sqlite3_errmsg(db_conn)); sqlite3_close(db_conn); th->state = 3; return(NULL); } printf("%d => started ....\n", th->th_num); th->state = 1; // Release all at once pthread_mutex_lock(&global_mutex); pthread_mutex_unlock(&global_mutex); j = 0; /* We want to synchronize accesses by threads, just to see what happens */ for ( j = 0; j < NUM_LOOPS; j++) { do { if (rollback) { printf("%d => Retrying transaction ...\n", th->th_num); usleep(50000); } rollback = 0; rc = sqlite3_execute(db_conn, "BEGIN IMMEDIATE", th->th_num); // rc = sqlite3_execute(db_conn, "BEGIN", th->th_num); if (rc == -1) { rollback = 1; continue; } else if (rc == 0) break; for (i=0; i<NUM_INSERTS; i++) { snprintf(temp, sizeof(temp), "INSERT INTO test_table VALUES(%d, %d, 'test%d_%d')", th->th_num, i, th->th_num, i); rc = sqlite3_execute(db_conn, temp, th->th_num); if (rc == -1) { rollback = 1; break;; } else if (rc == 0) break; } if (rc == 0) break; if (rollback) continue; rc = sqlite3_execute(db_conn, "COMMIT", th->th_num); if (rc == -1) { rollback = 1; continue; } else if (rc == 0) break; } while (rollback); usleep(1000); sqlite3_execute(db_conn, "select count(*) from test_Table ", th->th_num ); usleep(1000); } sqlite3_close(db_conn); if (rc == 0) { printf("%d => thread failed ...\n", th->th_num); th->state = 3; } else { th->state = 2; printf("%d => finished.\n", th->th_num); } return(NULL); } int main() { sqlite3 *db_conn = NULL; int rc, i; int retval = 0; struct stat myst; int trigger = 0; pthread_t thread; /* If the database already exists, lets go ahead and delete it */ if (stat(SQLITEDB, &myst) != -1) { unlink(SQLITEDB); } rc = sqlite3_open(SQLITEDB , &db_conn); if (rc) { printf("sqlite3_open() failed: %s\n", sqlite3_errmsg(db_conn)); sqlite3_close(db_conn); return(1); } #ifdef SHARED_CACHE rc = sqlite3_enable_shared_cache(1); if (rc != SQLITE_OK) { printf("failed to enable shared cache\n"); sqlite3_close(db_conn); return(1); } else printf("Shared Cache Enabled\n"); #endif /* Pre-create a table */ printf("Creating a table\n"); rc = sqlite3_execute(db_conn, "CREATE TABLE test_table(threadnum INT, cnt INT, testcol TEXT)", 0); if (rc != 1) { printf("failed to create table\n"); sqlite3_close(db_conn); return(1); } sqlite3_close(db_conn); pthread_mutex_init(&global_mutex, NULL); pthread_mutex_lock(&global_mutex); /* Start all threads */ th_data = malloc(sizeof(*th_data)*NUM_THREADS); for (i=0; i<NUM_THREADS; i++) { th_data[i].th_num = i; th_data[i].state = 0; pthread_create(&thread, NULL, testthread, &(th_data[i])); pthread_detach(thread); } /* Wait for all threads to start, then release mutex */ trigger = 0; while (!trigger) { trigger = 1; for (i=0; i<NUM_THREADS; i++) if (!th_data[i].state) trigger = 0; usleep(50000); } printf("all threads started\n"); pthread_mutex_unlock(&global_mutex); /* Wait for all threads to finish, then cleanup and exit */ trigger = 0; while (!trigger) { trigger = 1; for (i=0; i<NUM_THREADS; i++) if (th_data[i].state < 2) { trigger = 0; } else if (th_data[i].state > 2) { retval = 1; } usleep(50000); } pthread_mutex_destroy(&global_mutex); free(th_data); printf("exiting...(test %s)\n", (retval==0)?"succeeded":"failed"); return(retval); }
----------------------------------------------------------------------------- To unsubscribe, send email to [EMAIL PROTECTED] -----------------------------------------------------------------------------