On 05/02 11:18:19, Christophe Milard wrote:
> Hi Brian
>
> I have commented below, but it looks we need to talk. would you have time
> after the linaro sync call today, for a HO?
>
Hi Christophe,
Remaining comments from 'helper_abstraction_odp_thread' following our
discussion.
Brian
> diff --git a/helper/linux.c b/helper/linux.c
> index 24e243b..2714e51 100644
> --- a/helper/linux.c
> +++ b/helper/linux.c
> +/*
> + * wrapper for odpthreads, either implemented as linux threads or processes.
> + * (in process mode, if start_routine returns NULL, the process return 1.
> + */
> +static void *odpthread_run_start_routine(void *arg)
> {
> - odph_linux_thr_params_t *thr_params = arg;
> + int status;
> + int ret;
> + odph_odpthread_params_t *thr_params;
> +
> + _odph_odpthread_start_args_t *start_args = arg;
> +
> + thr_params = &start_args->thr_params;
>
> /* ODP thread local init */
> if (odp_init_local(thr_params->instance, thr_params->thr_type)) {
> ODPH_ERR("Local init failed\n");
> + if (start_args->linuxtype == PROCESS)
> + exit(-1);
We discussed -1 return value. However, _exit() vs exit() is relevant but beyond
the scope of this patch series.
> return NULL;
> }
>
> - void *ret_ptr = thr_params->start(thr_params->arg);
> - int ret = odp_term_local();
> + ODPH_DBG("helper: ODP %s thread started as linux %s. (pid=%d)\n",
> + thr_params->thr_type == ODP_THREAD_WORKER ?
> + "worker" : "control",
> + (start_args->linuxtype == PTHREAD) ?
> + "pthread" : "process",
> + (int)getpid());
for extra sanity, perhaps:
(start_args->linuxtype == PTHREAD) ? (int)pthread_self() : (int)getpid()
A potential RFC to logging facilities (beyond scope of this series) would
include pid and thread_id in log format alongside timestamp.
> + status = thr_params->start(thr_params->arg);
> + ret = odp_term_local();
>
> if (ret < 0)
> ODPH_ERR("Local term failed\n");
> else if (ret == 0 && odp_term_global(thr_params->instance))
> ODPH_ERR("Global term failed\n");
>
> - return ret_ptr;
> + /* for process implementation of odp threads, just return status... */
> + if (start_args->linuxtype == PROCESS)
> + exit(status);
> +
> + /* threads implementation return void* pointers: cast status to that. */
> + return (void *)(long)status;
Seems the cast to long is unnecessary?
> }
>
> -int odph_linux_pthread_create(odph_linux_pthread_t *pthread_tbl,
> - const odp_cpumask_t *mask,
> - const odph_linux_thr_params_t *thr_params)
> +/*
> + * Create a single ODPthread as a linux process
> + */
> +static int odph_linux_process_create(_odph_odpthread_t *thread_tbl,
> + int cpu,
> + const odph_odpthread_params_t *thr_params)
> {
> - int i;
> - int num;
> - int cpu_count;
> - int cpu;
> int ret;
> + cpu_set_t cpu_set;
> + pid_t pid;
>
> - num = odp_cpumask_count(mask);
> + CPU_ZERO(&cpu_set);
> + CPU_SET(cpu, &cpu_set);
>
> - memset(pthread_tbl, 0, num * sizeof(odph_linux_pthread_t));
> + thread_tbl->start_args.thr_params = *thr_params; /* copy */
> + thread_tbl->start_args.linuxtype = PROCESS;
> + thread_tbl->cpu = cpu;
>
> - cpu_count = odp_cpu_count();
> + pid = fork();
> + if (pid < 0) {
> + ODPH_ERR("fork() failed\n");
> + thread_tbl->start_args.linuxtype = NOT_STARTED;
> + return -1;
> + }
>
> - if (num < 1 || num > cpu_count) {
> - ODPH_ERR("Invalid number of threads:%d (%d cores available)\n",
> - num, cpu_count);
> + /* Parent continues to fork */
> + if (pid > 0) {
> + thread_tbl->proc.pid = pid;
> return 0;
> }
>
> - cpu = odp_cpumask_first(mask);
> - for (i = 0; i < num; i++) {
> - cpu_set_t cpu_set;
> + /* Child process */
>
> - CPU_ZERO(&cpu_set);
> - CPU_SET(cpu, &cpu_set);
> + /* Request SIGTERM if parent dies */
> + prctl(PR_SET_PDEATHSIG, SIGTERM);
OK. Seems only TM example doing basic sig handling.
> + /* Parent died already? */
> + if (getppid() == 1)
> + kill(getpid(), SIGTERM);
OK.
> - pthread_attr_init(&pthread_tbl[i].attr);
> + if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set)) {
> + ODPH_ERR("sched_setaffinity() failed\n");
> + return -2;
OK.
> + }
>
> - pthread_tbl[i].cpu = cpu;
> + odpthread_run_start_routine(&thread_tbl->start_args);
>
> - pthread_attr_setaffinity_np(&pthread_tbl[i].attr,
> - sizeof(cpu_set_t), &cpu_set);
> + return 0; /* never reached */
> +}
>
> - pthread_tbl[i].thr_params.start = thr_params->start;
> - pthread_tbl[i].thr_params.arg = thr_params->arg;
> - pthread_tbl[i].thr_params.thr_type = thr_params->thr_type;
> - pthread_tbl[i].thr_params.instance = thr_params->instance;
> +/*
> + * Create a single ODPthread as a linux thread
> + */
> +static int odph_linux_thread_create(_odph_odpthread_t *thread_tbl,
> + int cpu,
> + const odph_odpthread_params_t *thr_params)
> +{
> + int ret;
> + cpu_set_t cpu_set;
>
> - ret = pthread_create(&pthread_tbl[i].thread,
> - &pthread_tbl[i].attr,
> - odp_run_start_routine,
> - &pthread_tbl[i].thr_params);
> - if (ret != 0) {
> - ODPH_ERR("Failed to start thread on cpu #%d\n", cpu);
> - break;
> - }
> + CPU_ZERO(&cpu_set);
> + CPU_SET(cpu, &cpu_set);
>
> - cpu = odp_cpumask_next(mask, cpu);
> - }
> + pthread_attr_init(&thread_tbl->thread.attr);
>
> - return i;
> -}
> + thread_tbl->cpu = cpu;
>
> -void odph_linux_pthread_join(odph_linux_pthread_t *thread_tbl, int num)
> -{
> - int i;
> - int ret;
> + pthread_attr_setaffinity_np(&thread_tbl->thread.attr,
> + sizeof(cpu_set_t), &cpu_set);
>
> - for (i = 0; i < num; i++) {
> - /* Wait thread to exit */
> - ret = pthread_join(thread_tbl[i].thread, NULL);
> - if (ret != 0) {
> - ODPH_ERR("Failed to join thread from cpu #%d\n",
> - thread_tbl[i].cpu);
> - }
> - pthread_attr_destroy(&thread_tbl[i].attr);
> + thread_tbl->start_args.thr_params = *thr_params; /* copy */
> + thread_tbl->start_args.linuxtype = PTHREAD;
> +
> + ret = pthread_create(&thread_tbl->thread.thread_id,
> + &thread_tbl->thread.attr,
> + odpthread_run_start_routine,
> + &thread_tbl->start_args);
> + if (ret != 0) {
> + ODPH_ERR("Failed to start thread on cpu #%d\n", cpu);
> + thread_tbl->start_args.linuxtype = NOT_STARTED;
> + return ret;
> }
> +
> + return 0;
> }
>
> -int odph_linux_process_fork_n(odph_linux_process_t *proc_tbl,
> - const odp_cpumask_t *mask,
> - const odph_linux_thr_params_t *thr_params)
> +/*
> + * create an odpthread set (as linux processes or linux threads or both)
> + */
> +int odph_odpthreads_create(odph_odpthread_tbl_t *thread_tbl_ptr,
> + const odp_cpumask_t *mask,
> + const odph_odpthread_params_t *thr_params)
> {
> - pid_t pid;
> + int i;
> int num;
> int cpu_count;
> int cpu;
> - int i;
> + _odph_odpthread_t *thread_tbl;
>
> num = odp_cpumask_count(mask);
>
> - memset(proc_tbl, 0, num * sizeof(odph_linux_process_t));
> + thread_tbl = malloc(num * sizeof(_odph_odpthread_t));
> + *thread_tbl_ptr = (void *)thread_tbl;
> +
> + memset(thread_tbl, 0, num * sizeof(_odph_odpthread_t));
>
> cpu_count = odp_cpu_count();
>
> if (num < 1 || num > cpu_count) {
> - ODPH_ERR("Bad num\n");
> + ODPH_ERR("Invalid number of odpthreads:%d"
> + " (%d cores available)\n",
> + num, cpu_count);
> return -1;
> }
>
> cpu = odp_cpumask_first(mask);
> for (i = 0; i < num; i++) {
> - cpu_set_t cpu_set;
> + /*
> + * Thread mode by default, or if both thread and proc mode
> + * are required each second odpthread is a linux thread.
> + */
> + if ((!helper_options.proc) ||
> + (helper_options.proc && helper_options.thrd && (i & 1))) {
> + if (odph_linux_thread_create(&thread_tbl[i],
> + cpu,
> + thr_params))
> + break;
> + } else {
> + if (odph_linux_process_create(&thread_tbl[i],
> + cpu,
> + thr_params))
> + break;
> + }
>
> - CPU_ZERO(&cpu_set);
> - CPU_SET(cpu, &cpu_set);
> + cpu = odp_cpumask_next(mask, cpu);
> + }
> + thread_tbl[num - 1].last = 1;
> - pid = fork();
> + return i;
> +}
>
> - if (pid < 0) {
> - ODPH_ERR("fork() failed\n");
> - return -1;
> - }
> +/*
> + * wait for the odpthreads terminaison (linux processes and threads)
parlez-vous francais?
> + */
> +int odph_odpthreads_join(odph_odpthread_tbl_t *thread_tbl_ptr)
> +{
> + _odph_odpthread_t *thread_tbl;
> + pid_t pid;
> + int i = 0;
> + int status = 0; /* child process return code (!=0 is error) */
> + void *thread_ret; /* "child" thread return code (NULL is error) */
> + int ret;
> + int retval = 0;
>
> - /* Parent continues to fork */
> - if (pid > 0) {
> - proc_tbl[i].pid = pid;
> - proc_tbl[i].cpu = cpu;
> + thread_tbl = (_odph_odpthread_t *)*thread_tbl_ptr;
> +
> + if (!thread_tbl) {
> + ODPH_ERR("Attempt to join thread from invalid table\n");
> + return -1;
> + }
>
> - cpu = odp_cpumask_next(mask, cpu);
> + /* joins linux threads or wait for processes */
> + do {
Since 'last' is set for the last element in the array, it is possible to
run across some elements which are NOT_STARTED. Perhaps you could switch()
against the 'linuxtype' field and handle NOT_STARTED by doing nothing and
logging a warning?
> + /* pthreads: */
> + if (thread_tbl[i].start_args.linuxtype == PTHREAD) {
Another subjective comment beyond the scope of this patch series is to have
a thread checker utility for asserting that create/join calls are amde from
same or master pid/thread, otherwise ...
> + /* Wait thread to exit */
> + ret = pthread_join(thread_tbl[i].thread.thread_id,
> + &thread_ret);
> + if (ret != 0) {
> + ODPH_ERR("Failed to join thread from cpu #%d\n",
> + thread_tbl[i].cpu);
> + retval = -1;
> + } else {
> + if (thread_ret != NULL)
> + retval = -1;
> + }
> + pthread_attr_destroy(&thread_tbl[i].thread.attr);
> continue;
> }
>
> - /* Child process */
> + /* processes: */
> + pid = waitpid(thread_tbl[i].proc.pid, &status, 0);
>
> - /* Request SIGTERM if parent dies */
> - prctl(PR_SET_PDEATHSIG, SIGTERM);
> - /* Parent died already? */
> - if (getppid() == 1)
> - kill(getpid(), SIGTERM);
> + if (pid < 0) {
> + ODPH_ERR("wait() failed\n");
waitpid() failed
> + retval = -1;
> + }
>
> - if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set)) {
> - ODPH_ERR("sched_setaffinity() failed\n");
> - return -2;
> + /* Examine the child process' termination status */
> + if (WIFEXITED(status) && WEXITSTATUS(status) != EXIT_SUCCESS) {
We're assuming that an ODP thread 'start' routine will return 0 / EXIT_SUCCESS
_and_ will return in the first place--no killing in this join.
> + ODPH_ERR("Child exit status:%d (pid:%d)\n",
> + WEXITSTATUS(status), (int)pid);
> + retval = -1;
> }
> + if (WIFSIGNALED(status)) {
> + int signo = WTERMSIG(status);
>
> - if (odp_init_local(thr_params->instance,
> - thr_params->thr_type)) {
> - ODPH_ERR("Local init failed\n");
> - return -2;
> + ODPH_ERR("Child term signo:%d - %s (pid:%d)\n",
> + signo, strsignal(signo), (int)pid);
> + retval = -1;
OK. Nevermind. :)
> }
>
> - return 0;
> - }
> + } while (!thread_tbl[i++].last);
>
> - return 1;
> + /* free the allocated table: */
> + free(*thread_tbl_ptr);
> +
> + return (retval < 0) ? retval : i;
> }
_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp