This message was forwarded from developers-l...@monetdb.org.  The MonetDB
mailing lists have moved to monetdb.org.  Please subscribe to
developers-l...@monetdb.org, and unsubscribe from this list.
See: http://mail.monetdb.org/mailman/listinfo/developers-list

Send developers-list mailing list submissions to
        developers-l...@monetdb.org

To subscribe or unsubscribe via the World Wide Web, visit
        http://mail.monetdb.org/mailman/listinfo/developers-list
or, via email, send a message with subject or body 'help' to
        developers-list-requ...@monetdb.org

You can reach the person managing the list at
        developers-list-ow...@monetdb.org

When replying, please edit your Subject line so it is more specific
than "Re: Contents of developers-list digest..."


Today's Topics:

   1. Re: MonetDB: default - Low-level task scheduler.
      (Sjoerd Mullender)


----------------------------------------------------------------------

Message: 1
Date: Wed, 07 Nov 2012 17:17:55 +0100
From: Sjoerd Mullender <sjo...@monetdb.org>
To: developers-l...@monetdb.org
Subject: Re: MonetDB: default - Low-level task scheduler.
Message-ID: <509a89b3.6040...@monetdb.org>
Content-Type: text/plain; charset=UTF-8

There are a few things wrong with this code:

- sz?=?((sz?<<?1)?>>?1); does *not* turn sz into a multiple of two (as
suggested by the comment).  This statement basically is a no-op.
- when you create joinable threads, you should join them.
- it's not a great idea to use assert to make sure that GDKmalloc
succeeds.  Better is to return an error.
- The include of monet_options.h should be in the C file, not in the
include file (which would be included elsewhere where monet_options.h
will already be included).

On 2012-11-07 13:03, Martin Kersten wrote:
> Changeset: 5ff3c16e865f for MonetDB
> URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5ff3c16e865f
> Added Files:
>       gdk/gdk_mapreduce.c
>       gdk/gdk_mapreduce.h
> Modified Files:
>       gdk/Makefile.ag
>       monetdb5/modules/mal/groups.c
> Branch: default
> Log Message:
> 
> Low-level task scheduler.
> This module provide a lightweight map-reduce scheduler for multicore systems.
> A limited number of workers are initialized upfront, which take the tasks
> from a central queue. The header of these task descriptors should comply
> with the MRtask structure.
> 
> 
> diffs (239 lines):
> 
> diff --git a/gdk/Makefile.ag b/gdk/Makefile.ag
> --- a/gdk/Makefile.ag
> +++ b/gdk/Makefile.ag
> @@ -36,7 +36,7 @@ lib_gdk = {
>               gdk_private.h gdk_delta.h gdk_logger.h gdk_posix.h \
>               gdk_system.h gdk_tm.h gdk_storage.h \
>               gdk_calc.c gdk_calc.h gdk_calc_compare.h gdk_calc_private.h \
> -             gdk_aggr.c gdk_group.c \
> +             gdk_aggr.c gdk_group.c gdk_mapreduce.c gdk_mapreduce.h \
>               bat.feps bat1.feps bat2.feps \
>               libbat.rc
>       LIBS = ../common/options/libmoptions \
> diff --git a/gdk/gdk_mapreduce.c b/gdk/gdk_mapreduce.c
> new file mode 100644
> --- /dev/null
> +++ b/gdk/gdk_mapreduce.c
> @@ -0,0 +1,141 @@
> +/*
> + * The contents of this file are subject to the MonetDB Public License
> + * Version 1.1 (the "License"); you may not use this file except in
> + * compliance with the License. You may obtain a copy of the License at
> + * http://www.monetdb.org/Legal/MonetDBLicense
> + *
> + * Software distributed under the License is distributed on an "AS IS"
> + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
> + * License for the specific language governing rights and limitations
> + * under the License.
> + *
> + * The Original Code is the MonetDB Database System.
> + *
> + * The Initial Developer of the Original Code is CWI.
> + * Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
> + * Copyright August 2008-2012 MonetDB B.V.
> + * All Rights Reserved.
> + */
> +
> +/*
> + * (co)  Martin L. Kersten 
> + * This module provide a lightweight map-reduce scheduler for multicore 
> systems.
> + * A limited number of workers are initialized upfront, which take the tasks
> + * from a central queue. The header of these task descriptors should comply
> + * with the MRtask structure.
> + *
> + */
> +#include "monetdb_config.h"
> +#include "gdk.h"
> +#include "gdk_mapreduce.h"
> +
> +/* each entry in the queue contains a list of tasks */
> +typedef struct MRQUEUE {
> +    MRtask **tasks;
> +     int index;      /* next available task */
> +     int size;       /* number of tasks */
> +} MRqueue;
> +
> +static MRqueue *mrqueue;
> +static int mrqsize= -1;   /* size of queue */
> +static int mrqlast= -1;   
> +static MT_Lock mrqlock;  /* its a shared resource, ie we need locks */
> +static MT_Sema mrqsema;  /* threads wait on empty queues */
> +
> +
> +static void MRworker(void *);
> +
> +static void
> +MRqueueCreate(int sz)
> +{
> +     int i;
> +     MT_Id tid;
> +
> +     MT_lock_init(&mrqlock, "q_create");
> +     MT_lock_set(&mrqlock,"q_create");
> +     MT_sema_init(&mrqsema, 0, "q_create");
> +     sz = ((sz << 1) >> 1); /* we want a multiple of 2 */
> +     mrqueue = (MRqueue*)GDKzalloc(sizeof(MRqueue) *sz);
> +     assert(mrqueue);
> +     mrqsize = sz;
> +     mrqlast = 0;
> +     /* create a worker thread for each core as specified as system 
> parameter*/
> +     for ( i =0; i < GDKnr_threads; i++)
> +             MT_create_thread(&tid, MRworker, (void *) 0, MT_THR_JOINABLE);
> +     MT_lock_unset(&mrqlock,"q_create");
> +}
> +
> +static void
> +MRenqueue(int taskcnt, MRtask **tasks)
> +{
> +     assert(taskcnt > 0);
> +    MT_lock_set(&mrqlock, "mrqlock");
> +    if (mrqlast == mrqsize) {
> +        mrqsize <<= 1;
> +        mrqueue = (MRqueue*) GDKrealloc(mrqueue, sizeof(MRqueue) * mrqsize);
> +    }
> +     mrqueue[mrqlast].index = 0;
> +     mrqueue[mrqlast].tasks = tasks;
> +     mrqueue[mrqlast].size = taskcnt;
> +    mrqlast++;
> +    MT_lock_unset(&mrqlock, "mrqlock");
> +     /* a task list is added for consumption*/
> +     while (taskcnt-- > 0)
> +             MT_sema_up(&mrqsema, "mrqsema");
> +}
> +
> +static MRtask *
> +MRdequeue(void)
> +{
> +    MRtask *r = NULL;
> +     int idx;
> +
> +    MT_sema_down(&mrqsema, "mrqsema");
> +    assert(mrqlast);
> +    MT_lock_set(&mrqlock, "mrqlock");
> +    if (mrqlast > 0) {
> +             idx = mrqueue[mrqlast-1].index;
> +        r = mrqueue[mrqlast-1].tasks[idx++];
> +             if ( mrqueue[mrqlast-1].size == idx)
> +                     mrqlast--;
> +             else 
> +                     mrqueue[mrqlast-1].index = idx;
> +     }
> +    MT_lock_unset(&mrqlock, "mrqlock");
> +    assert(r);
> +    return r;
> +}
> +
> +static void
> +MRworker(void * arg)
> +{
> +     MRtask *task;
> +     (void) arg;
> +     do{
> +             task= MRdequeue();
> +             (task->cmd)(task);
> +             MT_sema_up(task->sema, "mrqsema");
> +     } while (1);
> +}
> +
> +/* schedule the tasks and return when all are done */
> +void
> +MRschedule(int taskcnt, void **arg, void (*cmd)(void*p))
> +{
> +     int i;
> +     MT_Sema sema;
> +     MRtask **task = (MRtask**) arg;
> +
> +     if ( mrqueue == 0)
> +             MRqueueCreate(1024);
> +     
> +     MT_sema_init(&sema, 0, "q_create");
> +     for ( i= 0; i < taskcnt; i++){
> +             task[i]->sema = & sema;
> +             task[i]->cmd = cmd;
> +     }
> +     MRenqueue(taskcnt,task);
> +     /* waiting for all report result */
> +     for ( i= 0; i < taskcnt; i++)
> +             MT_sema_down(&sema, "mrqsema");
> +}
> diff --git a/gdk/gdk_mapreduce.h b/gdk/gdk_mapreduce.h
> new file mode 100644
> --- /dev/null
> +++ b/gdk/gdk_mapreduce.h
> @@ -0,0 +1,32 @@
> +/*
> + * The contents of this file are subject to the MonetDB Public License
> + * Version 1.1 (the "License"); you may not use this file except in
> + * compliance with the License. You may obtain a copy of the License at
> + * http://www.monetdb.org/Legal/MonetDBLicense
> + *
> + * Software distributed under the License is distributed on an "AS IS"
> + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
> + * License for the specific language governing rights and limitations
> + * under the License.
> + *
> + * The Original Code is the MonetDB Database System.
> + *
> + * The Initial Developer of the Original Code is CWI.
> + * Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
> + * Copyright August 2008-2012 MonetDB B.V.
> + * All Rights Reserved.
> + */
> +
> +#ifndef _GDK_MAPREDUCE_H_
> +#define _GDK_MAPREDUCE_H_
> +
> +#include <monet_options.h>
> +
> +typedef struct{
> +    MT_Sema  *sema;                  /* micro scheduler handle */
> +    void (*cmd)(void *);     /* the function to be executed */
> +}MRtask;
> +
> +gdk_export void MRschedule(int taskcnt, void **arg, void (*cmd)(void *p));
> +
> +#endif /* _GDK_MAPREDUCE_H_ */
> diff --git a/monetdb5/modules/mal/groups.c b/monetdb5/modules/mal/groups.c
> --- a/monetdb5/modules/mal/groups.c
> +++ b/monetdb5/modules/mal/groups.c
> @@ -66,11 +66,15 @@ GRPmulticolumngroup(Client cntxt, MalBlk
>       /* sort order may have influences */
>       /* SF100 Q16 showed < ordering is 2 times faster as > ordering */
>       for ( i = 3; i< pci->argc; i++)
> -     for ( j = i+1; j<pci->argc; j++)
> -     if ( sizes[j] < sizes[i]){
> -             l = sizes[j]; sizes[j]= sizes[i]; sizes[i]= l;
> -             bi = bid[j]; bid[j]= bid[i]; bid[i]= bi;
> -     }
> +             for ( j = i+1; j<pci->argc; j++)
> +                     if ( sizes[j] < sizes[i]){
> +                             l = sizes[j];
> +                             sizes[j]= sizes[i];
> +                             sizes[i]= l;
> +                             bi = bid[j];
> +                             bid[j]= bid[i];
> +                             bid[i]= bi;
> +                     }
>       /* for (i=2; i<pci->argc; i++)
>               mnstr_printf(cntxt->fdout,"# after [%d] "LLFMT"\n",i, 
> sizes[i]); */
>  
> @@ -82,8 +86,6 @@ GRPmulticolumngroup(Client cntxt, MalBlk
>       i = 4;
>       if (msg == MAL_SUCCEED && pci->argc > 4 )
>       do {
> -             if (*ext) 
> -                     BBPdecref(*ext, TRUE);
>               /* early break when there are as many groups as histogram 
> entries */
>               b = BATdescriptor(*hist);
>               if (  b ){
> @@ -91,8 +93,8 @@ GRPmulticolumngroup(Client cntxt, MalBlk
>                       BBPreleaseref(*hist);
>                       if ( j) break;
>               }
> -             if (*hist) 
> -                     BBPdecref(*hist, TRUE);
> +             BBPdecref(*ext, TRUE);
> +             BBPdecref(*hist, TRUE);
>               
>               /* (grp,ext,hist) := group.subgroupdone(arg,grp) */
>               oldgrp= *grp;
> _______________________________________________
> checkin-list mailing list
> checkin-l...@monetdb.org
> http://mail.monetdb.org/mailman/listinfo/checkin-list
> 


-- 
Sjoerd Mullender


------------------------------

_______________________________________________
developers-list mailing list
developers-l...@monetdb.org
http://mail.monetdb.org/mailman/listinfo/developers-list


End of developers-list Digest, Vol 3, Issue 4
*********************************************

------------------------------------------------------------------------------
Everyone hates slow websites. So do we.
Make your web apps faster with AppDynamics
Download AppDynamics Lite for free today:
http://p.sf.net/sfu/appdyn_d2d_nov
_______________________________________________
Monetdb-developers mailing list
Monetdb-developers@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/monetdb-developers

Reply via email to