This isn't using the recommend backend kqueue method but here is an example
I created to use the threadpool behavior, you may be able to adopt it to
your needs - see attached.
Hope it helps.
On Wednesday, January 22, 2014 5:51:46 PM UTC-7, Ramesh Rayaprolu wrote:
>
> I am trying to do async File IO using libuv (uv_fs_* api's).
>
> I need to embed this File IO into an application which uses libevent and
> has its own main-loop.
>
> I was looking for such examples, and did not find any. Please let me know
> if someone has it.
>
> The problem that I am facing is like this --
> uv_fs_open, uv_fs_read have their own callbacks (which use uv_fs_t * as
> argument).
> But libevent has its own callback and that call back cannot get this
> "uv_fs_t *"
>
> So, whenever I add the uv_backend_fd to libevent, and as soon as I do
> uv_fs_open, I reach the libevent callback, and I see that the uv_fs_t * is
> not yet updated.
> Thus , I cannot get the uv_file (fd) to further read/write on this.
>
> Any help or suggestions on this is greatly appreciated.
>
> Thanks and Best Regards,
> Ramesh
>
>
--
You received this message because you are subscribed to the Google Groups
"libuv" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/libuv.
For more options, visit https://groups.google.com/groups/opt_out.
#include <uv.h>
#include <ev.h>
#include <stdlib.h>
#include <memory.h>
#include <stdio.h>
#include <unistd.h>
#define UNUSED_PARAM(p) ((void)&(p))
static uv_mutex_t lock;
static int job_num = 0;
static unsigned long primordial_thread;
// Queued work task to complete
static void do_work(uv_work_t* req)
{
UNUSED_PARAM(req);
uv_mutex_lock(&lock);
printf("Started job %d on thread id THREADPOOL (%lu)\n", job_num++,
uv_thread_self());
uv_mutex_unlock(&lock);
return;
}
// Finished task notifier
static void finish_work(uv_work_t* req, int status)
{
UNUSED_PARAM(req);
UNUSED_PARAM(status);
uv_mutex_lock(&lock);
printf("Finished job %s thread id (%lu)\n",
(uv_thread_self() == primordial_thread) ? "on MAIN" : "off main",
uv_thread_self());
uv_mutex_unlock(&lock);
free(req);
return;
}
// UV worker thread
// Queues 8 jobs / 10 seconds
static void worker_uv(void *data)
{
uv_loop_t *loop = (uv_loop_t*)data;
int i = 0;
while(1)
{
// queue up a bunch rapidly
for(int j = 0; j < 8; ++j)
{
printf("Queuing job %d from thread WORKER %lu\n", i++, uv_thread_self());
uv_work_t* work_req = malloc(sizeof(uv_work_t));
memset(work_req, 0, sizeof(uv_work_t));
uv_queue_work(loop, work_req, do_work, finish_work);
}
sleep(10);
}
}
#if defined TI_TIMER
static void loop_timer_ev_cb(struct ev_loop *loop, struct ev_timer *timer,
int revents)
{
UNUSED_PARAM(loop);
UNUSED_PARAM(revents);
printf("Checkging ev loop at timer cycle on thread %s\n",
(uv_thread_self() == primordial_thread) ? "MAIN" : "OFF MAIN");
uv_loop_t *loop_uv = timer->data;
uv_run(loop_uv, UV_RUN_NOWAIT);
}
#elif defined TI_IOPIPE
static void loop_io_ev_cb(struct ev_loop *loop, struct ev_io *io, int revents)
{
UNUSED_PARAM(loop);
UNUSED_PARAM(revents);
printf("Checking ev loop at readable event on uv_pipe %s\n",
(uv_thread_self() == primordial_thread) ? "MAIN" : "OFF MAIN");
uv_loop_t *loop_uv = io->data;
uv_run(loop_uv, UV_RUN_NOWAIT);
}
#elif defined TI_IDLE
static void loop_check_ev_cb(struct ev_loop *loop, struct ev_check *check,
int revents)
{
UNUSED_PARAM(loop);
UNUSED_PARAM(revents);
#ifdef OUTPUT_ONSLAUGHT
printf("Checking ev loop at turn cycle on thread %s\n",
(uv_thread_self() == primordial_thread) ? "MAIN" : "OFF MAIN");
printf("Turning uv_loop for finished jobs\n");
#endif
uv_loop_t *loop_uv = check->data;
uv_run(loop_uv, UV_RUN_NOWAIT);
}
static void loop_idle_ev_cb(struct ev_loop *loop, struct ev_idle *idle,
int revents)
{
UNUSED_PARAM(loop);
UNUSED_PARAM(idle);
UNUSED_PARAM(revents);
#ifdef OUTPUT_ONSLAUGHT
printf("Idle watcher turning EV loop on thread %s\n",
(uv_thread_self() == primordial_thread) ? "MAIN" : "OFF MAIN");
#endif
}
#endif
int main()
{
printf("Starting test on PRIMORDIAL thread id (%lu)\n", uv_thread_self());
primordial_thread = uv_thread_self();
// Spin up a UV loop and a lock - the lock is simply for the test
// The UV loop is necessary to pull messages off its queue and to 'transport'
// that message back to our primordial(main) thread. This makes sure that
// wherever a task is started we are notified on the mock V8 thread.
uv_mutex_init(&lock);
uv_loop_t *loop_uv = uv_default_loop();
// Create a thread that will go off and create jobs
uv_thread_t uv_worker_id;
uv_thread_create(&uv_worker_id, worker_uv, loop_uv);
// Create the EV main loop. This is the mock doLoop and main thread for
// scripting.
struct ev_loop *main_loop = ev_default_loop(0);
// This is a method via EV loop timer to periodically take stuff from UV
#if defined TI_TIMER
// Since we're not doing IO mimic some events with a timer. In a system with
// IO an ev_check will run and the uv_run can be called there.
ev_timer loop_timer_ev;
loop_timer_ev.data = (void*)loop_uv;
ev_timer_init(&loop_timer_ev, loop_timer_ev_cb, 1, 1);
ev_timer_start(main_loop, &loop_timer_ev);
// This is a method to directly read from the UV loop's async notification pipe
#elif defined TI_IOPIPE
ev_io loop_io_ev;
loop_io_ev.data = (void*)loop_uv;
ev_io_init(&loop_io_ev, loop_io_ev_cb,
loop_uv->async_watcher.io_watcher.fd, EV_READ);
ev_io_start(main_loop, &loop_io_ev);
// This method uses a Check event if the EV loop is spinning, or an Idle event
// if it's inactive.
#elif defined TI_IDLE
// Replicating the pattern from here
// http://tinyurl.com/lfmy7zw
// Low priority idle and a check event. The check event turns the uv_loop once
// and the idle checker ensures it doesn't get starved. Does cause a type
// loop leaving an idle system looking very very busy
ev_check loop_check_ev;
loop_check_ev.data = (void*)loop_uv;
ev_check_init(&loop_check_ev, loop_check_ev_cb);
ev_check_start(main_loop, &loop_check_ev);
ev_idle loop_idle_ev;
ev_idle_init(&loop_idle_ev, loop_idle_ev_cb);
ev_set_priority(&loop_idle_ev, EV_MINPRI);
ev_idle_start(main_loop, &loop_idle_ev);
#endif
ev_loop(main_loop, EVBACKEND_KQUEUE);
return 0;
}