On Sat, 21 May 2016 10:26:17 +0200
Ben Noordhuis <...> wrote:

> On Fri, May 20, 2016 at 11:02 PM, 'Mikeus' via libuv
> <[email protected]> wrote:
> > On Thu, 19 May 2016 22:02:09 +0300
> > 'Mikeus' via libuv <...> wrote:
> >
> >> On Thu, 19 May 2016 17:47:52 +0200
> >> Ben Noordhuis <...> wrote:
> >>
> >> >
> >> > I suspect you're writing data faster than the other end of the pipe
> >> > can read it.  Am I right that handle->write_queue_size keeps growing?
> >> >
> >>
> >> I've added printing out the write_queue_size after write request creating 
> >> in read_cb()
> >> and in the write callback function too. It shows the value is always 
> >> either 0 or 6300144 (or 6300112) bytes.
> >>
> >
> > You appeared to be right, Ben.
> > After fixing a bug that I made when referencing the write_queue_size field, 
> > it would be seen that the queue is growing:
> 
> What you can do is call uv_read_stop() when handle->write_queue_size
> exceeds a threshold, and call uv_read_start() when it drops below the
> threshold again.
> 
> (You probably already figured that out but I mention it in case people
> come across this thread.)
> 

Thank you for responses.
I've added the trigger you mentioned, fixed bugs, and this simple program works 
fine now.
I think it demonstrates well some essential points that one comes across with
when beginning to develop programs using libuv. I've attached the resulting 
source for completeness.

-- 
Mike


-- 
You received this message because you are subscribed to the Google Groups 
"libuv" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/libuv.
For more options, visit https://groups.google.com/d/optout.
#include <uv.h>
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>


#define PRINT_UV_ERR(prefix, code)  do {\
    fflush(stdout);\
    fprintf(stderr, "%s: %s (%i): %s\n", prefix, uv_err_name(code), 
(int)(code), uv_strerror(code));\
    fflush(stderr);\
} while (0)


/* assuming that stdin and stdout can be handled with as pipes */
uv_pipe_t in, out;


void alloc_cb(uv_handle_t*, size_t, uv_buf_t*);
void read_cb(uv_stream_t*, ssize_t, const uv_buf_t*);
void write_cb(uv_write_t*, int);


enum { RD_NOP, RD_START, RD_STOP, RD_PAUSE } rdcmd_state = RD_NOP;

const size_t WRITE_QUEUE_LIMIT = 100*65536;


int main(int _argc, char *_argv[])
{
  int ret = 0;

#ifdef BLOCK_SIGPIPE
  sigset_t set;
  sigemptyset(&set);
  sigaddset(&set, SIGPIPE);
  ret = sigprocmask(SIG_BLOCK, &set, NULL);
  if (ret != 0)
  {
    perror("sigprocmask");
    return ret;
  };
#endif

  uv_loop_t *loop = uv_default_loop();

  uv_pipe_init(loop, &in, 0);
  ret = uv_pipe_open(&in, fileno(stdin));
  if (ret < 0)
  {
    PRINT_UV_ERR("stdin open", ret);
    return ret;
  };

  uv_pipe_init(loop, &out, 0);
  ret = uv_pipe_open(&out, fileno(stdout));
  if (ret < 0)
  {
    PRINT_UV_ERR("stdout open", ret);
    return ret;
  };

  uv_read_start((uv_stream_t*)&in, alloc_cb, read_cb);
  rdcmd_state = RD_START;

  return uv_run(loop, UV_RUN_DEFAULT);
}



void alloc_cb(uv_handle_t *_handle, size_t _suggested_size, uv_buf_t *_buf)
{
  /* allocate the memory for a new I/O buffer */
  *_buf = uv_buf_init((char*)malloc(_suggested_size), _suggested_size);
#ifndef NDEBUG
  fprintf(stderr, "[%p] create: I/O buffer\n", _buf->base);  fflush(stderr);
#endif
}


void read_cb(uv_stream_t *_stream, ssize_t _nread, const uv_buf_t *_buf)
{
  if (_nread < 0)
  {
    PRINT_UV_ERR("read", _nread);
    uv_read_stop(_stream);
    rdcmd_state = RD_STOP;
  }
  else if (_nread > 0)
  {
    /* initialize a new buffer descriptor specifying the actual data length */
    uv_buf_t buf = uv_buf_init(_buf->base, _nread);

    /* create a write request descriptor */
    uv_write_t *wr = (uv_write_t*)malloc(sizeof(uv_write_t));
#ifndef NDEBUG
    fprintf(stderr, "[%p] create: write request\n", (void*)wr);  fflush(stderr);
#endif

    /* save a reference to the output buffer somehow along with the write 
request */
    wr->data = _buf->base;

    /* fire up the write request */
    uv_write(wr, (uv_stream_t*)&out, &buf, 1, write_cb);
    /* the I/O buffer being used up should be deleted somewhere */


#ifndef NDEBUG
    fprintf(stderr, "write_queue_size@read_cb=[%zu]\n", out.write_queue_size);  
fflush(stderr);
#endif
    if (rdcmd_state == RD_START && out.write_queue_size >= WRITE_QUEUE_LIMIT)
    {
      uv_read_stop((uv_stream_t*)&in);
      rdcmd_state = RD_PAUSE;
#ifndef NDEBUG
      fprintf(stderr, "RD_PAUSE=[ON]@write_queue_size=[%zu]\n", 
out.write_queue_size);  fflush(stderr);
#endif
    };
  }
}


void write_cb(uv_write_t *_wr, int _status)
{
  if (_status < 0)
  {
    PRINT_UV_ERR("write", _status);
    uv_read_stop((uv_stream_t*)&in);
    rdcmd_state = RD_STOP;
  };

#ifndef NDEBUG
  fprintf(stderr, "write_queue_size@write_cb=[%zu]\n", 
_wr->handle->write_queue_size);  fflush(stderr);
#endif
  if (rdcmd_state == RD_PAUSE && _wr->handle->write_queue_size < 
WRITE_QUEUE_LIMIT)
  {
    uv_read_start((uv_stream_t*)&in, alloc_cb, read_cb);
    rdcmd_state = RD_START;
#ifndef NDEBUG
    fprintf(stderr, "RD_PAUSE=[OFF]@write_queue_size=[%zu]\n", 
_wr->handle->write_queue_size);  fflush(stderr);
#endif
  };


  /* when the write request has completed it's safe to free up the memory 
allocated for the I/O buffer */
  free(_wr->data);
#ifndef NDEBUG
  fprintf(stderr, "[%p] free: I/O buffer\n", _wr->data);  fflush(stderr);
#endif

  /* delete the write request descriptor */
  free(_wr);
#ifndef NDEBUG
  fprintf(stderr, "[%p] free: write request\n", (void*)_wr);  fflush(stderr);
#endif
}

Reply via email to