Dan Source code is attached. I didn't write this, someone else from the forum 
did their name is not on it, nor coppyrighted.. I thought it was a clean way to 
test threading.

Interestingly if you remove out the shared cache everything runs to completion.



Dan Kennedy <[EMAIL PROTECTED]> wrote: 
Hi Ken,

Probably a bug in the new threading stuff. Can you share
source code for this test or is it part of some large app?

Either way, thanks for the report.

Dan.

On Wed, 2007-08-29 at 22:15 -0700, Ken wrote:
> Also erros out here, sporadically.
> int sqlite3OsWrite(sqlite3_file *id, const void *pBuf, int amt, i64 offset){
>   return id->pMethods->xWrite(id, pBuf, amt, offset);
> }
> 
> Program received signal SIGSEGV, Segmentation fault.
> [Switching to Thread 1075841376 (LWP 15747)]
> 0x000000000040c413 in sqlite3OsWrite (id=0x55aaa0, pBuf=0x401ffc30, amt=24, 
> offset=0) at os.c:38
> (gdb) Quit
> (gdb) 
> 
> Ken  wrote: 4 threads, shared_Cache enabled
> LOOP 100 
>     BEGIN
>      LOOP 50 times
>       INSERT
>      end LOOP
>      COMMIT
> 
>       SELECT COUNT(*) ...
> end LOOP
> 
> 
> program received signal SIGSEGV, Segmentation fault.
> [Switching to Thread 1080043872 (LWP 15448)]
> moveToChild (pCur=0x569058, newPgno=) at btree.c:3304
> (gdb) 
> 
> 
> if( rc ) return rc;
>   pNewPage->idxParent = pCur->idx;
>   pOldPage = pCur->pPage;
>   pOldPage->idxShift = 0;     <---------------- Error Here
>   releasePage(pOldPage);
>   pCur->pPage = pNewPage;
>   pCur->idx = 0;
>   pCur->info.nSize = 0;
> 
> 
> Ken
> 
> 
> 
> 
> Ken  wrote: 4 threads, shared_Cache enabled
> LOOP 100 
>     BEGIN
>      LOOP 50 times
>       INSERT
>      end LOOP
>      COMMIT
> 
>       SELECT COUNT(*) ...
> end LOOP
> 
> 
> program received signal SIGSEGV, Segmentation fault.
> [Switching to Thread 1080043872 (LWP 15448)]
> moveToChild (pCur=0x569058, newPgno=) at btree.c:3304
> (gdb) 
> 
> 
> if( rc ) return rc;
>   pNewPage->idxParent = pCur->idx;
>   pOldPage = pCur->pPage;
>   pOldPage->idxShift = 0;     <---------------- Error Here
>   releasePage(pOldPage);
>   pCur->pPage = pNewPage;
>   pCur->idx = 0;
>   pCur->info.nSize = 0;
> 
> 
> Ken
> 
> 


-----------------------------------------------------------------------------
To unsubscribe, send email to [EMAIL PROTECTED]
-----------------------------------------------------------------------------


#include <stdio.h>
#include <sqlite3.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <pthread.h>

#ifdef __Linux__
#  define _XOPEN_SOURCE 500
#endif
#include <unistd.h>

#define SQLITEDB "test.db"
#define NUM_THREADS 4
#define MAX_BUSY 25
#define NUM_LOOPS   200
#define NUM_INSERTS 10

#define SHARED_CACHE 1

#ifndef RETRY_BUSY
#define RETRY_BUSY 1
#endif

static void sync_thread(int thnum);

typedef struct sql_result {
	int alloc_rows;
	int rows;
	int cols;
	char ***data;
} sql_result;

typedef struct th_st {
	int th_num;
	/* 0 = not started, 1 = started, 2 = done, 3 = error */
	int state;
} th_st;

th_st *th_data = NULL;

pthread_mutex_t global_mutex;
int on_thread = 0;

#define SQLITE_ALLOC_ROWS 1024
static int sqlite3_rowcallback(void *myresult, int argc, char **argv, char **colName)
{
	sql_result *result = myresult;
	int i;
	(void)colName;
	if (result == NULL)
		return (SQLITE_ERROR);
	if (result->cols == 0)
		result->cols=argc;
	else if (result->cols != argc)
		return (SQLITE_ERROR);

	if (result->rows >= result->alloc_rows) {
		result->data=(char ***)realloc(result->data, (result->alloc_rows+SQLITE_ALLOC_ROWS)*sizeof(char **));
		result->alloc_rows += SQLITE_ALLOC_ROWS;
	}

	result->data[result->rows] = (char **)malloc((result->cols)*sizeof(char *));
	for (i=0; i<result->cols; i++) {
		if (argv[i] == NULL)
			result->data[result->rows][i] = NULL;
		else
			result->data[result->rows][i] = strdup(argv[i]);
	}
	result->rows++;
	return(SQLITE_OK);
}

static void sqlite3_printresult(sql_result *result)
{
	int i, j;
	if (!result->rows) return;

	printf("Query Results:\n");
	for (i=0; i<result->rows; i++) {
		for (j=0; j<result->cols; j++) {
			if (j != 0)
				printf(",");
			printf("%s", result->data[i][j]);
		}
		printf("\n");
	}
	printf("\n");
}

static void sqlite3_freeresult(sql_result *result)
{
	int i, j;
	if (!result->rows) return;
	for (i=0; i<result->rows; i++) {
		for (j=0; j<result->cols; j++) {
			free(result->data[i][j]);
		}
		free(result->data[i]);
	}
	free(result->data);
}

/** sqlite3_execute
 *  Description: Execute SQL statment, loop on busy
 *  Return     : 1 on success, 0 on failure, -1 on rollback/concurrency issue
 */
static int sqlite3_execute(sqlite3 *db_conn, const char *statement,  int thnum)
{
	int rc, retval;
	sql_result result;
	char *errmsg = NULL;
#if RETRY_BUSY
	int busy_cnt = 0;
#endif

	memset(&result, 0, sizeof(result));

	do {
		errmsg = NULL;
		printf("%d => Executing: %s\n", thnum, statement);
		rc = sqlite3_exec(db_conn, statement, sqlite3_rowcallback, &result, &errmsg);
		switch (rc) {
			case SQLITE_OK:
				retval = 1;
				break;
			case SQLITE_BUSY:
#if RETRY_BUSY
				busy_cnt++;
				break;
#endif
			case SQLITE_LOCKED:
				/* Issue a rollback if not in autocommit mode */
				if (!sqlite3_get_autocommit(db_conn)) {
					if (sqlite3_execute(db_conn, "ROLLBACK", thnum) != 1) {
						printf("%d => ROLLBACK failed\n", thnum);
					} else {
						printf("%d => ROLLBACK succeeded\n", thnum);
					}
				}
				retval = -1;
			break;
			default:
				printf("%d => query failed: %s\n", thnum, errmsg);
				retval = 0;
			break;
		}

		if (errmsg != NULL) {
			sqlite3_free(errmsg);
			errmsg = NULL;
		}

#if RETRY_BUSY
		/* Don't loop too fast */
		if (rc == SQLITE_BUSY) {
			if (busy_cnt > MAX_BUSY) {
				rc = SQLITE_ERROR;
				retval = 0;
				printf("%d => MAX BUSY CNT\n", thnum);
			} else {
				printf("%d => BUSY\n", thnum);
				usleep(40000);
			}
		}
	} while (rc == SQLITE_BUSY);
#else
	} while (0); 
#endif

	sqlite3_printresult(&result);
	sqlite3_freeresult(&result);

	return(retval);
}

static void sync_thread(int thnum)
{
	/* Unassign from self, and assign to next thread */
	pthread_mutex_lock(&global_mutex);
	if (on_thread == thnum)
		on_thread++;
	pthread_mutex_unlock(&global_mutex);

	while(1) {
		pthread_mutex_lock(&global_mutex);

		/* Thread could disappear */
		if (on_thread < NUM_THREADS && th_data[on_thread].state >= 2)
			on_thread++;

		if (on_thread >= NUM_THREADS)
			on_thread = 0;
		
		if (on_thread == thnum) {
			pthread_mutex_unlock(&global_mutex);
			break;
		}
		pthread_mutex_unlock(&global_mutex);
		usleep(20000);
	}
}



void *testthread(void *arg)
{
	int rc;
	int rollback = 0;
	sqlite3 *db_conn = NULL;
	th_st *th = arg;
	char temp[255];
	int i, j;
	
	rc = sqlite3_open(SQLITEDB , &db_conn);
	if (rc) {
		printf("%d => sqlite3_open() failed: %s\n", th->th_num, sqlite3_errmsg(db_conn));
		sqlite3_close(db_conn);
		th->state = 3;
		return(NULL);
	}
        
	printf("%d => started ....\n", th->th_num);
	th->state = 1;
        // Release all at once
	pthread_mutex_lock(&global_mutex);
	pthread_mutex_unlock(&global_mutex);

        j = 0;

	/* We want to synchronize accesses by threads, just to see what happens */
        for ( j = 0; j < NUM_LOOPS; j++) {
	   do {
		if (rollback) {
			printf("%d => Retrying transaction ...\n", th->th_num);
			usleep(50000);
		}
		rollback = 0;

		rc = sqlite3_execute(db_conn, "BEGIN IMMEDIATE", th->th_num);
		// rc = sqlite3_execute(db_conn, "BEGIN", th->th_num);
		if (rc == -1) {
			rollback = 1;
			continue;
		} else if (rc == 0)
			break;
                 
		for (i=0; i<NUM_INSERTS; i++) {
			snprintf(temp, sizeof(temp), "INSERT INTO test_table VALUES(%d, %d, 'test%d_%d')",
					th->th_num, i, th->th_num, i);
			rc = sqlite3_execute(db_conn, temp,  th->th_num);
			if (rc == -1) {
				rollback = 1;
				break;;
			} else if (rc == 0)
				break;
		}
		if (rc == 0) break;
		if (rollback) continue;

		rc = sqlite3_execute(db_conn, "COMMIT", th->th_num);
		if (rc == -1) {
			rollback = 1;
			continue;
		} else if (rc == 0) 
			break;

	   } while (rollback);
          
           usleep(1000); 
           sqlite3_execute(db_conn, "select count(*) from test_Table ",  th->th_num );
           usleep(1000); 
        }


	sqlite3_close(db_conn);

	if (rc == 0) {
		printf("%d => thread failed ...\n", th->th_num);
		th->state = 3;
	} else {
		th->state = 2;
		printf("%d => finished.\n", th->th_num);
	}
	return(NULL);
}

int main()
{
	sqlite3 *db_conn = NULL;
	int rc, i;
	int retval = 0;
	struct stat myst;
	int trigger = 0;
	pthread_t thread;

	/* If the database already exists, lets go ahead and delete it */
	if (stat(SQLITEDB, &myst) != -1) {
		unlink(SQLITEDB);
	}

	rc = sqlite3_open(SQLITEDB , &db_conn);
	if (rc) {
		printf("sqlite3_open() failed: %s\n", sqlite3_errmsg(db_conn));
		sqlite3_close(db_conn);
		return(1);
	}
       
#ifdef SHARED_CACHE 
        rc = sqlite3_enable_shared_cache(1);
	if (rc != SQLITE_OK) {
		printf("failed to enable shared cache\n");
		sqlite3_close(db_conn);
		return(1);
        } else  
		printf("Shared Cache Enabled\n");
#endif

	/* Pre-create a table */
	printf("Creating a table\n");
	rc = sqlite3_execute(db_conn, "CREATE TABLE test_table(threadnum INT, cnt INT, testcol TEXT)", 0);
	if (rc != 1) {
		printf("failed to create table\n");
		sqlite3_close(db_conn);
		return(1);
	}

	sqlite3_close(db_conn);

	pthread_mutex_init(&global_mutex, NULL);
	pthread_mutex_lock(&global_mutex);

	/* Start all threads */
	th_data = malloc(sizeof(*th_data)*NUM_THREADS);
	for (i=0; i<NUM_THREADS; i++) {
		th_data[i].th_num = i;
		th_data[i].state = 0;
		pthread_create(&thread, NULL, testthread, &(th_data[i]));
		pthread_detach(thread);
	}
        
	/* Wait for all threads to start, then release mutex */
	trigger = 0;
	while (!trigger) {
		trigger = 1;
		for (i=0; i<NUM_THREADS; i++)
			if (!th_data[i].state)
				trigger = 0;

		usleep(50000);
	}
	printf("all threads started\n");
	pthread_mutex_unlock(&global_mutex);

	/* Wait for all threads to finish, then cleanup and exit */
	trigger = 0;
	while (!trigger) {
		trigger = 1;
		for (i=0; i<NUM_THREADS; i++)
			if (th_data[i].state < 2) {
				trigger = 0;
			} else if (th_data[i].state > 2) {
				retval = 1;
			}

		usleep(50000);
	}

	pthread_mutex_destroy(&global_mutex);
	free(th_data);
	printf("exiting...(test %s)\n", (retval==0)?"succeeded":"failed");
	return(retval);
}



-----------------------------------------------------------------------------
To unsubscribe, send email to [EMAIL PROTECTED]
-----------------------------------------------------------------------------

Reply via email to