On 05/02 11:18:19, Christophe Milard wrote:
> Hi Brian
> 
> I have commented below, but it looks we need to talk.  would you have time
> after the linaro sync call today, for a HO?
> 

Hi Christophe,

Remaining comments from 'helper_abstraction_odp_thread' following our
discussion.

Brian

> diff --git a/helper/linux.c b/helper/linux.c
> index 24e243b..2714e51 100644
> --- a/helper/linux.c
> +++ b/helper/linux.c
> +/*
> + * wrapper for odpthreads, either implemented as linux threads or processes.
> + * (in process mode, if start_routine returns NULL, the process return 1.
> + */
> +static void *odpthread_run_start_routine(void *arg)
>  {
> -     odph_linux_thr_params_t *thr_params = arg;
> +     int status;
> +     int ret;
> +     odph_odpthread_params_t *thr_params;
> +
> +     _odph_odpthread_start_args_t *start_args = arg;
> +
> +     thr_params = &start_args->thr_params;
>  
>       /* ODP thread local init */
>       if (odp_init_local(thr_params->instance, thr_params->thr_type)) {
>               ODPH_ERR("Local init failed\n");
> +             if (start_args->linuxtype == PROCESS)
> +                     exit(-1);

We discussed -1 return value. However, _exit() vs exit() is relevant but beyond
the scope of this patch series.

>               return NULL;
>       }
>  
> -     void *ret_ptr = thr_params->start(thr_params->arg);
> -     int ret = odp_term_local();
> +     ODPH_DBG("helper: ODP %s thread started as linux %s. (pid=%d)\n",
> +              thr_params->thr_type == ODP_THREAD_WORKER ?
> +              "worker" : "control",
> +              (start_args->linuxtype == PTHREAD) ?
> +              "pthread" : "process",
> +              (int)getpid());

for extra sanity, perhaps:

  (start_args->linuxtype == PTHREAD) ? (int)pthread_self() : (int)getpid()

A potential RFC to logging facilities (beyond scope of this series) would
include pid and thread_id in log format alongside timestamp.

> +     status = thr_params->start(thr_params->arg);
> +     ret = odp_term_local();
>  
>       if (ret < 0)
>               ODPH_ERR("Local term failed\n");
>       else if (ret == 0 && odp_term_global(thr_params->instance))
>               ODPH_ERR("Global term failed\n");
>  
> -     return ret_ptr;
> +     /* for process implementation of odp threads, just return status... */
> +     if (start_args->linuxtype == PROCESS)
> +             exit(status);
> +
> +     /* threads implementation return void* pointers: cast status to that. */
> +     return (void *)(long)status;

Seems the cast to long is unnecessary?

>  }
>  
> -int odph_linux_pthread_create(odph_linux_pthread_t *pthread_tbl,
> -                           const odp_cpumask_t *mask,
> -                           const odph_linux_thr_params_t *thr_params)
> +/*
> + * Create a single ODPthread as a linux process
> + */
> +static int odph_linux_process_create(_odph_odpthread_t *thread_tbl,
> +                                  int cpu,
> +                                  const odph_odpthread_params_t *thr_params)
>  {
> -     int i;
> -     int num;
> -     int cpu_count;
> -     int cpu;
>       int ret;
> +     cpu_set_t cpu_set;
> +     pid_t pid;
>  
> -     num = odp_cpumask_count(mask);
> +     CPU_ZERO(&cpu_set);
> +     CPU_SET(cpu, &cpu_set);
>  
> -     memset(pthread_tbl, 0, num * sizeof(odph_linux_pthread_t));
> +     thread_tbl->start_args.thr_params    = *thr_params; /* copy */
> +     thread_tbl->start_args.linuxtype     = PROCESS;
> +     thread_tbl->cpu = cpu;
>  
> -     cpu_count = odp_cpu_count();
> +     pid = fork();
> +     if (pid < 0) {
> +             ODPH_ERR("fork() failed\n");
> +             thread_tbl->start_args.linuxtype = NOT_STARTED;
> +             return -1;
> +     }
>  
> -     if (num < 1 || num > cpu_count) {
> -             ODPH_ERR("Invalid number of threads:%d (%d cores available)\n",
> -                      num, cpu_count);
> +     /* Parent continues to fork */
> +     if (pid > 0) {
> +             thread_tbl->proc.pid  = pid;
>               return 0;
>       }
>  
> -     cpu = odp_cpumask_first(mask);
> -     for (i = 0; i < num; i++) {
> -             cpu_set_t cpu_set;
> +     /* Child process */
>  
> -             CPU_ZERO(&cpu_set);
> -             CPU_SET(cpu, &cpu_set);
> +     /* Request SIGTERM if parent dies */
> +     prctl(PR_SET_PDEATHSIG, SIGTERM);

OK. Seems only TM example doing basic sig handling.

> +     /* Parent died already? */
> +     if (getppid() == 1)
> +             kill(getpid(), SIGTERM);

OK.

> -             pthread_attr_init(&pthread_tbl[i].attr);
> +     if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set)) {
> +             ODPH_ERR("sched_setaffinity() failed\n");
> +             return -2;

OK.

> +     }
>  
> -             pthread_tbl[i].cpu = cpu;
> +     odpthread_run_start_routine(&thread_tbl->start_args);
>  
> -             pthread_attr_setaffinity_np(&pthread_tbl[i].attr,
> -                                         sizeof(cpu_set_t), &cpu_set);
> +     return 0; /* never reached */
> +}
>  
> -             pthread_tbl[i].thr_params.start    = thr_params->start;
> -             pthread_tbl[i].thr_params.arg      = thr_params->arg;
> -             pthread_tbl[i].thr_params.thr_type = thr_params->thr_type;
> -             pthread_tbl[i].thr_params.instance = thr_params->instance;
> +/*
> + * Create a single ODPthread as a linux thread
> + */
> +static int odph_linux_thread_create(_odph_odpthread_t *thread_tbl,
> +                                 int cpu,
> +                                 const odph_odpthread_params_t *thr_params)
> +{
> +     int ret;
> +     cpu_set_t cpu_set;
>  
> -             ret = pthread_create(&pthread_tbl[i].thread,
> -                                  &pthread_tbl[i].attr,
> -                                  odp_run_start_routine,
> -                                  &pthread_tbl[i].thr_params);
> -             if (ret != 0) {
> -                     ODPH_ERR("Failed to start thread on cpu #%d\n", cpu);
> -                     break;
> -             }
> +     CPU_ZERO(&cpu_set);
> +     CPU_SET(cpu, &cpu_set);
>  
> -             cpu = odp_cpumask_next(mask, cpu);
> -     }
> +     pthread_attr_init(&thread_tbl->thread.attr);
>  
> -     return i;
> -}
> +     thread_tbl->cpu = cpu;
>  
> -void odph_linux_pthread_join(odph_linux_pthread_t *thread_tbl, int num)
> -{
> -     int i;
> -     int ret;
> +     pthread_attr_setaffinity_np(&thread_tbl->thread.attr,
> +                                 sizeof(cpu_set_t), &cpu_set);
>  
> -     for (i = 0; i < num; i++) {
> -             /* Wait thread to exit */
> -             ret = pthread_join(thread_tbl[i].thread, NULL);
> -             if (ret != 0) {
> -                     ODPH_ERR("Failed to join thread from cpu #%d\n",
> -                              thread_tbl[i].cpu);
> -             }
> -             pthread_attr_destroy(&thread_tbl[i].attr);
> +     thread_tbl->start_args.thr_params    = *thr_params; /* copy */
> +     thread_tbl->start_args.linuxtype     = PTHREAD;
> +
> +     ret = pthread_create(&thread_tbl->thread.thread_id,
> +                          &thread_tbl->thread.attr,
> +                          odpthread_run_start_routine,
> +                          &thread_tbl->start_args);
> +     if (ret != 0) {
> +             ODPH_ERR("Failed to start thread on cpu #%d\n", cpu);
> +             thread_tbl->start_args.linuxtype = NOT_STARTED;
> +             return ret;
>       }
> +
> +     return 0;
>  }
>  
> -int odph_linux_process_fork_n(odph_linux_process_t *proc_tbl,
> -                           const odp_cpumask_t *mask,
> -                           const odph_linux_thr_params_t *thr_params)
> +/*
> + * create an odpthread set (as linux processes or linux threads or both)
> + */
> +int odph_odpthreads_create(odph_odpthread_tbl_t *thread_tbl_ptr,
> +                        const odp_cpumask_t *mask,
> +                        const odph_odpthread_params_t *thr_params)
>  {
> -     pid_t pid;
> +     int i;
>       int num;
>       int cpu_count;
>       int cpu;
> -     int i;
> +     _odph_odpthread_t *thread_tbl;
>  
>       num = odp_cpumask_count(mask);
>  
> -     memset(proc_tbl, 0, num * sizeof(odph_linux_process_t));
> +     thread_tbl = malloc(num * sizeof(_odph_odpthread_t));
> +     *thread_tbl_ptr = (void *)thread_tbl;
> +
> +     memset(thread_tbl, 0, num * sizeof(_odph_odpthread_t));
>  
>       cpu_count = odp_cpu_count();
>  
>       if (num < 1 || num > cpu_count) {
> -             ODPH_ERR("Bad num\n");
> +             ODPH_ERR("Invalid number of odpthreads:%d"
> +                      " (%d cores available)\n",
> +                      num, cpu_count);
>               return -1;
>       }
>  
>       cpu = odp_cpumask_first(mask);
>       for (i = 0; i < num; i++) {
> -             cpu_set_t cpu_set;
> +             /*
> +              * Thread mode by default, or if both thread and proc mode
> +              * are required each second odpthread is a linux thread.
> +              */
> +             if ((!helper_options.proc) ||
> +                 (helper_options.proc && helper_options.thrd && (i & 1))) {
> +                     if (odph_linux_thread_create(&thread_tbl[i],
> +                                                  cpu,
> +                                                  thr_params))
> +                             break;
> +              } else {
> +                     if (odph_linux_process_create(&thread_tbl[i],
> +                                                   cpu,
> +                                                   thr_params))
> +                             break;
> +             }
>  
> -             CPU_ZERO(&cpu_set);
> -             CPU_SET(cpu, &cpu_set);
> +             cpu = odp_cpumask_next(mask, cpu);
> +     }
> +     thread_tbl[num - 1].last = 1;
> -             pid = fork();
> +     return i;
> +}
>  
> -             if (pid < 0) {
> -                     ODPH_ERR("fork() failed\n");
> -                     return -1;
> -             }
> +/*
> + * wait for the odpthreads terminaison (linux processes and threads)

parlez-vous francais?

> + */
> +int odph_odpthreads_join(odph_odpthread_tbl_t *thread_tbl_ptr)
> +{
> +     _odph_odpthread_t *thread_tbl;
> +     pid_t pid;
> +     int i = 0;
> +     int status = 0;         /* child process return code (!=0 is error) */
> +     void *thread_ret;       /* "child" thread return code (NULL is error) */
> +     int ret;
> +     int retval = 0;
>  
> -             /* Parent continues to fork */
> -             if (pid > 0) {
> -                     proc_tbl[i].pid  = pid;
> -                     proc_tbl[i].cpu = cpu;
> +     thread_tbl = (_odph_odpthread_t *)*thread_tbl_ptr;
> +
> +     if (!thread_tbl) {
> +             ODPH_ERR("Attempt to join thread from invalid table\n");
> +             return -1;
> +     }
>  
> -                     cpu = odp_cpumask_next(mask, cpu);
> +     /* joins linux threads or wait for processes */
> +     do {

Since 'last' is set for the last element in the array, it is possible to
run across some elements which are NOT_STARTED. Perhaps you could switch()
against the 'linuxtype' field and handle NOT_STARTED by doing nothing and
logging a warning?

> +             /* pthreads: */
> +             if (thread_tbl[i].start_args.linuxtype == PTHREAD) {

Another subjective comment beyond the scope of this patch series is to have
a thread checker utility for asserting that create/join calls are amde from
same or master pid/thread, otherwise ...

> +                     /* Wait thread to exit */
> +                     ret = pthread_join(thread_tbl[i].thread.thread_id,
> +                                        &thread_ret);
> +                     if (ret != 0) {
> +                             ODPH_ERR("Failed to join thread from cpu #%d\n",
> +                                      thread_tbl[i].cpu);
> +                             retval = -1;
> +                     } else {
> +                             if (thread_ret != NULL)
> +                                     retval = -1;
> +                     }
> +                     pthread_attr_destroy(&thread_tbl[i].thread.attr);
>                       continue;
>               }
>  
> -             /* Child process */
> +             /* processes: */
> +             pid = waitpid(thread_tbl[i].proc.pid, &status, 0);
>  
> -             /* Request SIGTERM if parent dies */
> -             prctl(PR_SET_PDEATHSIG, SIGTERM);
> -             /* Parent died already? */
> -             if (getppid() == 1)
> -                     kill(getpid(), SIGTERM);
> +             if (pid < 0) {
> +                     ODPH_ERR("wait() failed\n");

waitpid() failed

> +                     retval = -1;
> +             }
>  
> -             if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set)) {
> -                     ODPH_ERR("sched_setaffinity() failed\n");
> -                     return -2;
> +             /* Examine the child process' termination status */
> +             if (WIFEXITED(status) && WEXITSTATUS(status) != EXIT_SUCCESS) {

We're assuming that an ODP thread 'start' routine will return 0 / EXIT_SUCCESS
_and_ will return in the first place--no killing in this join.

> +                     ODPH_ERR("Child exit status:%d (pid:%d)\n",
> +                              WEXITSTATUS(status), (int)pid);
> +                     retval = -1;
>               }
> +             if (WIFSIGNALED(status)) {
> +                     int signo = WTERMSIG(status);
>  
> -             if (odp_init_local(thr_params->instance,
> -                                thr_params->thr_type)) {
> -                     ODPH_ERR("Local init failed\n");
> -                     return -2;
> +                     ODPH_ERR("Child term signo:%d - %s (pid:%d)\n",
> +                              signo, strsignal(signo), (int)pid);
> +                     retval = -1;

OK. Nevermind. :)

>               }
>  
> -             return 0;
> -     }
> +     } while (!thread_tbl[i++].last);
>  
> -     return 1;
> +     /* free the allocated table: */
> +     free(*thread_tbl_ptr);
> +
> +     return (retval < 0) ? retval : i;
>  }
_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to