On Sat, 21 May 2016 10:26:17 +0200
Ben Noordhuis <...> wrote:
> On Fri, May 20, 2016 at 11:02 PM, 'Mikeus' via libuv
> <[email protected]> wrote:
> > On Thu, 19 May 2016 22:02:09 +0300
> > 'Mikeus' via libuv <...> wrote:
> >
> >> On Thu, 19 May 2016 17:47:52 +0200
> >> Ben Noordhuis <...> wrote:
> >>
> >> >
> >> > I suspect you're writing data faster than the other end of the pipe
> >> > can read it. Am I right that handle->write_queue_size keeps growing?
> >> >
> >>
> >> I've added printing out the write_queue_size after write request creating
> >> in read_cb()
> >> and in the write callback function too. It shows the value is always
> >> either 0 or 6300144 (or 6300112) bytes.
> >>
> >
> > You appeared to be right, Ben.
> > After fixing a bug that I made when referencing the write_queue_size field,
> > it would be seen that the queue is growing:
>
> What you can do is call uv_read_stop() when handle->write_queue_size
> exceeds a threshold, and call uv_read_start() when it drops below the
> threshold again.
>
> (You probably already figured that out but I mention it in case people
> come across this thread.)
>
Thank you for responses.
I've added the trigger you mentioned, fixed bugs, and this simple program works
fine now.
I think it demonstrates well some essential points that one comes across with
when beginning to develop programs using libuv. I've attached the resulting
source for completeness.
--
Mike
--
You received this message because you are subscribed to the Google Groups
"libuv" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/libuv.
For more options, visit https://groups.google.com/d/optout.
#include <uv.h>
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#define PRINT_UV_ERR(prefix, code) do {\
fflush(stdout);\
fprintf(stderr, "%s: %s (%i): %s\n", prefix, uv_err_name(code),
(int)(code), uv_strerror(code));\
fflush(stderr);\
} while (0)
/* assuming that stdin and stdout can be handled with as pipes */
uv_pipe_t in, out;
void alloc_cb(uv_handle_t*, size_t, uv_buf_t*);
void read_cb(uv_stream_t*, ssize_t, const uv_buf_t*);
void write_cb(uv_write_t*, int);
enum { RD_NOP, RD_START, RD_STOP, RD_PAUSE } rdcmd_state = RD_NOP;
const size_t WRITE_QUEUE_LIMIT = 100*65536;
int main(int _argc, char *_argv[])
{
int ret = 0;
#ifdef BLOCK_SIGPIPE
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGPIPE);
ret = sigprocmask(SIG_BLOCK, &set, NULL);
if (ret != 0)
{
perror("sigprocmask");
return ret;
};
#endif
uv_loop_t *loop = uv_default_loop();
uv_pipe_init(loop, &in, 0);
ret = uv_pipe_open(&in, fileno(stdin));
if (ret < 0)
{
PRINT_UV_ERR("stdin open", ret);
return ret;
};
uv_pipe_init(loop, &out, 0);
ret = uv_pipe_open(&out, fileno(stdout));
if (ret < 0)
{
PRINT_UV_ERR("stdout open", ret);
return ret;
};
uv_read_start((uv_stream_t*)&in, alloc_cb, read_cb);
rdcmd_state = RD_START;
return uv_run(loop, UV_RUN_DEFAULT);
}
void alloc_cb(uv_handle_t *_handle, size_t _suggested_size, uv_buf_t *_buf)
{
/* allocate the memory for a new I/O buffer */
*_buf = uv_buf_init((char*)malloc(_suggested_size), _suggested_size);
#ifndef NDEBUG
fprintf(stderr, "[%p] create: I/O buffer\n", _buf->base); fflush(stderr);
#endif
}
void read_cb(uv_stream_t *_stream, ssize_t _nread, const uv_buf_t *_buf)
{
if (_nread < 0)
{
PRINT_UV_ERR("read", _nread);
uv_read_stop(_stream);
rdcmd_state = RD_STOP;
}
else if (_nread > 0)
{
/* initialize a new buffer descriptor specifying the actual data length */
uv_buf_t buf = uv_buf_init(_buf->base, _nread);
/* create a write request descriptor */
uv_write_t *wr = (uv_write_t*)malloc(sizeof(uv_write_t));
#ifndef NDEBUG
fprintf(stderr, "[%p] create: write request\n", (void*)wr); fflush(stderr);
#endif
/* save a reference to the output buffer somehow along with the write
request */
wr->data = _buf->base;
/* fire up the write request */
uv_write(wr, (uv_stream_t*)&out, &buf, 1, write_cb);
/* the I/O buffer being used up should be deleted somewhere */
#ifndef NDEBUG
fprintf(stderr, "write_queue_size@read_cb=[%zu]\n", out.write_queue_size);
fflush(stderr);
#endif
if (rdcmd_state == RD_START && out.write_queue_size >= WRITE_QUEUE_LIMIT)
{
uv_read_stop((uv_stream_t*)&in);
rdcmd_state = RD_PAUSE;
#ifndef NDEBUG
fprintf(stderr, "RD_PAUSE=[ON]@write_queue_size=[%zu]\n",
out.write_queue_size); fflush(stderr);
#endif
};
}
}
void write_cb(uv_write_t *_wr, int _status)
{
if (_status < 0)
{
PRINT_UV_ERR("write", _status);
uv_read_stop((uv_stream_t*)&in);
rdcmd_state = RD_STOP;
};
#ifndef NDEBUG
fprintf(stderr, "write_queue_size@write_cb=[%zu]\n",
_wr->handle->write_queue_size); fflush(stderr);
#endif
if (rdcmd_state == RD_PAUSE && _wr->handle->write_queue_size <
WRITE_QUEUE_LIMIT)
{
uv_read_start((uv_stream_t*)&in, alloc_cb, read_cb);
rdcmd_state = RD_START;
#ifndef NDEBUG
fprintf(stderr, "RD_PAUSE=[OFF]@write_queue_size=[%zu]\n",
_wr->handle->write_queue_size); fflush(stderr);
#endif
};
/* when the write request has completed it's safe to free up the memory
allocated for the I/O buffer */
free(_wr->data);
#ifndef NDEBUG
fprintf(stderr, "[%p] free: I/O buffer\n", _wr->data); fflush(stderr);
#endif
/* delete the write request descriptor */
free(_wr);
#ifndef NDEBUG
fprintf(stderr, "[%p] free: write request\n", (void*)_wr); fflush(stderr);
#endif
}