On 12 May 2016 at 10:17, Savolainen, Petri (Nokia - FI/Espoo) <
[email protected]> wrote:

>
>
> > -----Original Message-----
> > From: lng-odp [mailto:[email protected]] On Behalf Of
> > Christophe Milard
> > Sent: Wednesday, May 11, 2016 7:42 PM
> > To: [email protected]; [email protected]; lng-
> > [email protected]
> > Subject: [lng-odp] [PATCHv6 05/38] helpers: linux: creating functions to
> > handle odpthreads
> >
> > Two functions, odph_odpthreads_create and odph_odpthreads_join
> > are created to create and termindate odp threads.
> > These function will create the odp threads either as linux processes or
> > as linux threads, depending on the command line arguments (flag odph_proc
> > or odph_thread). If both flags are given, every second odp thread will be
> > process, (others are threads). default is thead only.
> > Former functions (odph_linux_pthread_create, odph_linux_pthread_join,
> > odph_linux_process_fork, odph_linux_process_fork_n and
> > odph_linux_process_wait_n) are left for a compatibility, but are aimed
> > to be removed.
> >
> > Signed-off-by: Christophe Milard <[email protected]>
> > ---
> >  helper/include/odp/helper/linux.h |  28 ++++
> >  helper/linux.c                    | 267
> > ++++++++++++++++++++++++++++++++++++++
> >  2 files changed, 295 insertions(+)
> >
> > diff --git a/helper/include/odp/helper/linux.h
> > b/helper/include/odp/helper/linux.h
> > index f99b88a..f67aa30 100644
> > --- a/helper/include/odp/helper/linux.h
> > +++ b/helper/include/odp/helper/linux.h
> > @@ -166,6 +166,34 @@ int odph_linux_process_fork_n(odph_linux_process_t
> > *proc_tbl,
> >   */
> >  int odph_linux_process_wait_n(odph_linux_process_t *proc_tbl, int num);
> >
> > +/**
> > + * Creates and launches odpthreads (as linux threads or processes)
> > + *
> > + * Creates, pins and launches threads to separate CPU's based on the
> > cpumask.
> > + *
> > + * @param thread_tbl    Thread table
> > + * @param mask          CPU mask
> > + * @param thr_params    ODP thread parameters
> > + *
> > + * @return Number of threads created
> > + */
> > +int odph_odpthreads_create(odph_odpthread_t *thread_tbl,
> > +                        const odp_cpumask_t *mask,
> > +                        const odph_odpthread_params_t *thr_params);
> > +
> > +/**
> > + * Waits odpthreads (as linux threads or processes) to exit.
> > + *
> > + * Returns when all odpthreads have terminated.
> > + *
> > + * @param thread_tbl    Thread table
> > + * @return The number of joined threads or -1 on error.
> > + * (error occurs if any of the start_routine return non-zero or if
> > + *  the thread join/process wait itself failed -e.g. as the result of a
> > kill)
> > + *
> > + */
> > +int odph_odpthreads_join(odph_odpthread_t *thread_tbl);
> > +
> >
> >  /**
> >   * Parse linux helper options
> > diff --git a/helper/linux.c b/helper/linux.c
> > index 5dbc2da..b8d4f49 100644
> > --- a/helper/linux.c
> > +++ b/helper/linux.c
> > @@ -22,6 +22,11 @@
> >  #include <odp/helper/linux.h>
> >  #include "odph_debug.h"
> >
> > +static struct {
> > +     int proc; /* true when process mode is required */
> > +     int thrd; /* true when thread mode is required */
> > +} helper_options;
> > +
> >  static void *odp_run_start_routine(void *arg)
> >  {
> >       odph_linux_thr_params_t *thr_params = arg;
> > @@ -239,6 +244,262 @@ int odph_linux_process_wait_n(odph_linux_process_t
> > *proc_tbl, int num)
> >  }
> >
> >  /*
> > + * wrapper for odpthreads, either implemented as linux threads or
> > processes.
> > + * (in process mode, if start_routine returns NULL, the process return
> > FAILURE).
> > + */
> > +static void *odpthread_run_start_routine(void *arg)
> > +{
> > +     int status;
> > +     int ret;
> > +     odph_odpthread_params_t *thr_params;
> > +
> > +     odph_odpthread_start_args_t *start_args = arg;
> > +
> > +     thr_params = &start_args->thr_params;
> > +
> > +     /* ODP thread local init */
> > +     if (odp_init_local(thr_params->instance, thr_params->thr_type)) {
> > +             ODPH_ERR("Local init failed\n");
> > +             if (start_args->linuxtype == ODPTHREAD_PROCESS)
> > +                     _exit(EXIT_FAILURE);
> > +             return NULL;
> > +     }
> > +
> > +     ODPH_DBG("helper: ODP %s thread started as linux %s. (pid=%d)\n",
> > +              thr_params->thr_type == ODP_THREAD_WORKER ?
> > +              "worker" : "control",
> > +              (start_args->linuxtype == ODPTHREAD_PTHREAD) ?
> > +              "pthread" : "process",
> > +              (int)getpid());
> > +
> > +     status = thr_params->start(thr_params->arg);
> > +     ret = odp_term_local();
> > +
> > +     if (ret < 0)
> > +             ODPH_ERR("Local term failed\n");
> > +     else if (ret == 0 && odp_term_global(thr_params->instance))
> > +             ODPH_ERR("Global term failed\n");
> > +
> > +     /* for process implementation of odp threads, just return status...
> > */
> > +     if (start_args->linuxtype == ODPTHREAD_PROCESS)
> > +             _exit(status);
> > +
> > +     /* threads implementation return void* pointers: cast status to
> > that. */
> > +     return (void *)(long)status;
> > +}
> > +
> > +/*
> > + * Create a single ODPthread as a linux process
> > + */
> > +static int odph_linux_process_create(odph_odpthread_t *thread_tbl,
> > +                                  int cpu,
> > +                                  const odph_odpthread_params_t
> *thr_params)
> > +{
> > +     int ret;
> > +     cpu_set_t cpu_set;
> > +     pid_t pid;
> > +
> > +     CPU_ZERO(&cpu_set);
> > +     CPU_SET(cpu, &cpu_set);
> > +
> > +     thread_tbl->start_args.thr_params    = *thr_params; /* copy */
> > +     thread_tbl->start_args.linuxtype     = ODPTHREAD_PROCESS;
> > +     thread_tbl->cpu = cpu;
> > +
> > +     pid = fork();
> > +     if (pid < 0) {
> > +             ODPH_ERR("fork() failed\n");
> > +             thread_tbl->start_args.linuxtype = ODPTHREAD_NOT_STARTED;
> > +             return -1;
> > +     }
> > +
> > +     /* Parent continues to fork */
> > +     if (pid > 0) {
> > +             thread_tbl->proc.pid  = pid;
> > +             return 0;
> > +     }
> > +
> > +     /* Child process */
> > +
> > +     /* Request SIGTERM if parent dies */
> > +     prctl(PR_SET_PDEATHSIG, SIGTERM);
> > +     /* Parent died already? */
> > +     if (getppid() == 1)
> > +             kill(getpid(), SIGTERM);
> > +
> > +     if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set)) {
> > +             ODPH_ERR("sched_setaffinity() failed\n");
> > +             return -2;
> > +     }
> > +
> > +     odpthread_run_start_routine(&thread_tbl->start_args);
> > +
> > +     return 0; /* never reached */
> > +}
> > +
> > +/*
> > + * Create a single ODPthread as a linux thread
> > + */
> > +static int odph_linux_thread_create(odph_odpthread_t *thread_tbl,
> > +                                 int cpu,
> > +                                 const odph_odpthread_params_t
> *thr_params)
> > +{
> > +     int ret;
> > +     cpu_set_t cpu_set;
> > +
> > +     CPU_ZERO(&cpu_set);
> > +     CPU_SET(cpu, &cpu_set);
> > +
> > +     pthread_attr_init(&thread_tbl->thread.attr);
> > +
> > +     thread_tbl->cpu = cpu;
> > +
> > +     pthread_attr_setaffinity_np(&thread_tbl->thread.attr,
> > +                                 sizeof(cpu_set_t), &cpu_set);
> > +
> > +     thread_tbl->start_args.thr_params    = *thr_params; /* copy */
> > +     thread_tbl->start_args.linuxtype     = ODPTHREAD_PTHREAD;
> > +
> > +     ret = pthread_create(&thread_tbl->thread.thread_id,
> > +                          &thread_tbl->thread.attr,
> > +                          odpthread_run_start_routine,
> > +                          &thread_tbl->start_args);
> > +     if (ret != 0) {
> > +             ODPH_ERR("Failed to start thread on cpu #%d\n", cpu);
> > +             thread_tbl->start_args.linuxtype = ODPTHREAD_NOT_STARTED;
> > +             return ret;
> > +     }
> > +
> > +     return 0;
> > +}
> > +
> > +/*
> > + * create an odpthread set (as linux processes or linux threads or both)
> > + */
> > +int odph_odpthreads_create(odph_odpthread_t *thread_tbl,
> > +                        const odp_cpumask_t *mask,
> > +                        const odph_odpthread_params_t *thr_params)
> > +{
> > +     int i;
> > +     int num;
> > +     int cpu_count;
> > +     int cpu;
> > +
> > +     num = odp_cpumask_count(mask);
> > +
> > +     memset(thread_tbl, 0, num * sizeof(odph_odpthread_t));
> > +
> > +     cpu_count = odp_cpu_count();
> > +
> > +     if (num < 1 || num > cpu_count) {
> > +             ODPH_ERR("Invalid number of odpthreads:%d"
> > +                      " (%d cores available)\n",
> > +                      num, cpu_count);
> > +             return -1;
> > +     }
> > +
> > +     cpu = odp_cpumask_first(mask);
> > +     for (i = 0; i < num; i++) {
> > +             /*
> > +              * Thread mode by default, or if both thread and proc mode
> > +              * are required each second odpthread is a linux thread.
> > +              */
>
>
> This is a weird logic. All threads should be one type (when you have
> boolean cmd line params for thread type). Pthread should be the default, so
> all threads are pthreads if user don't give any param. It's an error if
> user gives both.
>

This follows a requirement from you that the linux ODP implementation
should support a mix of both ODP threads implemented as pthread and process
at the same time. I felt that a "--odph_mixed" option is not more clear
than specifying both the things we want, i.e. processes and thread.
Any better suggestion?

>
>
>
> > +             if ((!helper_options.proc) ||
> > +                 (helper_options.proc && helper_options.thrd && (i &
> 1))) {
> > +                     if (odph_linux_thread_create(&thread_tbl[i],
> > +                                                  cpu,
> > +                                                  thr_params))
> > +                             break;
> > +              } else {
> > +                     if (odph_linux_process_create(&thread_tbl[i],
> > +                                                   cpu,
> > +                                                   thr_params))
> > +                             break;
> > +             }
> > +
> > +             cpu = odp_cpumask_next(mask, cpu);
> > +     }
> > +     thread_tbl[num - 1].last = 1;
> > +
> > +     return i;
> > +}
> > +
> > +/*
> > + * wait for the odpthreads termination (linux processes and threads)
> > + */
> > +int odph_odpthreads_join(odph_odpthread_t *thread_tbl)
> > +{
> > +     pid_t pid;
> > +     int i = 0;
> > +     int terminated = 0;
> > +     int status = 0;         /* child process return code (!=0 is error)
> > */
> > +     void *thread_ret;       /* "child" thread return code (NULL is
> error) */
> > +     int ret;
> > +     int retval = 0;
> > +
> > +     /* joins linux threads or wait for processes */
> > +     do {
> > +             /* pthreads: */
> > +             switch (thread_tbl[i].start_args.linuxtype) {
> > +             case ODPTHREAD_PTHREAD:
> > +                     /* Wait thread to exit */
> > +                     ret = pthread_join(thread_tbl[i].thread.thread_id,
> > +                                        &thread_ret);
> > +                     if (ret != 0) {
> > +                             ODPH_ERR("Failed to join thread from cpu
> #%d\n",
> > +                                      thread_tbl[i].cpu);
> > +                             retval = -1;
> > +                     } else {
> > +                             terminated++;
> > +                             if (thread_ret != NULL)
> > +                                     retval = -1;
> > +                     }
> > +                     pthread_attr_destroy(&thread_tbl[i].thread.attr);
> > +                     break;
> > +
> > +             case ODPTHREAD_PROCESS:
> > +
> > +                     /* processes: */
> > +                     pid = waitpid(thread_tbl[i].proc.pid, &status, 0);
> > +
> > +                     if (pid < 0) {
> > +                             ODPH_ERR("waitpid() failed\n");
> > +                             retval = -1;
> > +                             break;
> > +                     }
> > +
> > +                     terminated++;
> > +
> > +                     /* Examine the child process' termination status */
> > +                     if (WIFEXITED(status) &&
> > +                         WEXITSTATUS(status) != EXIT_SUCCESS) {
> > +                             ODPH_ERR("Child exit status:%d (pid:%d)\n",
> > +                                      WEXITSTATUS(status), (int)pid);
> > +                             retval = -1;
> > +                     }
> > +                     if (WIFSIGNALED(status)) {
> > +                             int signo = WTERMSIG(status);
> > +
> > +                             ODPH_ERR("Child term signo:%d - %s
> (pid:%d)\n",
> > +                                      signo, strsignal(signo),
> (int)pid);
> > +                             retval = -1;
> > +                     }
> > +                     break;
> > +
> > +             case ODPTHREAD_NOT_STARTED:
> > +                     ODPH_DBG("No join done on not started
> ODPthread.\n");
> > +                     break;
> > +             default:
> > +                     ODPH_DBG("Invalid case statement value!\n");
> > +                     break;
> > +             }
> > +
> > +     } while (!thread_tbl[i++].last);
> > +
> > +     return (retval < 0) ? retval : terminated;
> > +}
> > +
> > +/*
> >   * Parse command line options to extract options affecting helpers.
> >   */
> >  int odph_parse_options(int argc, char *argv[])
> > @@ -247,9 +508,15 @@ int odph_parse_options(int argc, char *argv[])
> >
> >       static struct option long_options[] = {
> >               /* These options set a flag. */
> > +             {"odph_proc",   no_argument, &helper_options.proc, 1},
> > +             {"odph_thread", no_argument, &helper_options.thrd, 1},
> >               {0, 0, 0, 0}
> >               };
> >
> > +     /* defaults: */
> > +     helper_options.proc = false;
> > +     helper_options.thrd = false;
>
>
> Pthread should be the default option. We should not always require a cmd
> line param from user (if no param ==> pthread).
>

We don't!. This structure reflects the options that getopts finds on the
command line: Before parsing the command line, no options have been found!.
The default IS pthread when no options are given.

Christophe.

>
> -Petri
>
>
>
>
> > +
> >       while (1) {
> >               /* getopt_long stores the option index here. */
> >               int option_index = 0;
> > --
> > 2.5.0
> >
> > _______________________________________________
> > lng-odp mailing list
> > [email protected]
> > https://lists.linaro.org/mailman/listinfo/lng-odp
>
_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to