On Tue, Feb 08, 2022 at 02:45:19PM +0300, Vladimir Homutov wrote: > On Mon, Feb 07, 2022 at 05:16:17PM +0300, Roman Arutyunyan wrote: > > Hi, > > > > On Fri, Feb 04, 2022 at 04:56:23PM +0300, Vladimir Homutov wrote: > > > On Tue, Feb 01, 2022 at 04:39:59PM +0300, Roman Arutyunyan wrote: > > > > # HG changeset patch > > > > # User Roman Arutyunyan <a...@nginx.com> > > > > # Date 1643722727 -10800 > > > > # Tue Feb 01 16:38:47 2022 +0300 > > > > # Branch quic > > > > # Node ID db31ae16c1f2050be9c9f6b1f117ab6725b97dd4 > > > > # Parent 308ac307b3e6952ef0c5ccf10cc82904c59fa4c3 > > > > QUIC: stream lingering. > > > > > > > > Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that > > > > it > > > > can persist after connection is closed by application. During this > > > > period, > > > > server is expecting stream final size from client for correct flow > > > > control. > > > > Also, buffered output is sent to client as more flow control credit is > > > > granted. > > > > > > > [..] > > > > > > > +static ngx_int_t > > > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs) > > > > +{ > > > > + size_t limit, len; > > > > + ngx_uint_t last; > > > > + ngx_chain_t *out, *cl; > > > > + ngx_quic_frame_t *frame; > > > > + ngx_connection_t *pc; > > > > + ngx_quic_connection_t *qc; > > > > + > > > > + if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) { > > > > + return NGX_OK; > > > > + } > > > > + > > > > + pc = qs->parent; > > > > + qc = ngx_quic_get_connection(pc); > > > > + > > > > + limit = ngx_quic_max_stream_flow(qs); > > > > + last = 0; > > > > + > > > > + out = ngx_quic_read_chain(pc, &qs->out, limit); > > > > + if (out == NGX_CHAIN_ERROR) { > > > > + return NGX_ERROR; > > > > + } > > > > + > > > > + len = 0; > > > > + last = 0; > > > > > > this assignment looks duplicate. > > > > Thanks, fixed. > > > > > [..] > > > > > > > +static ngx_int_t > > > > +ngx_quic_close_stream(ngx_quic_stream_t *qs) > > > > +{ > > > > ngx_connection_t *pc; > > > > ngx_quic_frame_t *frame; > > > > - ngx_quic_stream_t *qs; > > > > ngx_quic_connection_t *qc; > > > > > > > > - qs = c->quic; > > > > pc = qs->parent; > > > > qc = ngx_quic_get_connection(pc); > > > > > > > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > > > > - "quic stream id:0x%xL cleanup", qs->id); > > > > + if (!qc->closing) { > > > > + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV > > > > + || qs->send_state == NGX_QUIC_STREAM_SEND_READY > > > > + || qs->send_state == NGX_QUIC_STREAM_SEND_SEND) > > > > + { > > > > > > so basically this are the states where we need to wait for FIN? > > > and thus avoid closing till we get it. > > > I would add a comment here. > > > > On the receiving end we wait either for fin or for reset to have final size. > > On the sending end we wait for everything that's buffered to be sent. > > Added a comment about that. > > > > > [..] > > > > + if (qs->connection == NULL) { > > > > + return ngx_quic_close_stream(qs); > > > > + } > > > > + > > > > ngx_quic_set_event(qs->connection->write); > > > > > > this pattern - check connection, close if NULL and set event seem to > > > repeat. Maybe it's worth to try to put this check/action into > > > ngx_quic_set_event somehow ? we could instead have > > > set_read_event/set_write_event maybe. > > > > I thought about this too, but it's not always that simple. And even if it > > was, > > the new function/macro would have unclear semantics. Let's just remember > > this > > as a possible future optimiation. > > > > > > +static ngx_int_t > > > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs) > > > > + > > > [..] > > > > + if (len == 0 && !last) { > > > > + return NGX_OK; > > > > + } > > > > + > > > > + frame = ngx_quic_alloc_frame(pc); > > > > + if (frame == NULL) { > > > > + return NGX_ERROR; > > > > + } > > > > + > > > > + frame = ngx_quic_alloc_frame(pc); > > > > + if (frame == NULL) { > > > > + return NGX_ERROR; > > > > + } > > > > > > one more dup here. > > > > Yes, thanks. > > > > > Overal, it looks good, but the testing revealed another issue: with big > > > buffer sizes we run into issue of too long chains in > > > ngx_quic_write_chain(). > > > As discussed, this certainly needs optimization - probably adding some > > > pointer to the end to facilitate appending, or something else. > > > > It's true ngx_quic_write_chain() needs to be optimized. When the buffered > > chain is big, it takes too much time to find the write point. I'll address > > this is a separate patch. Meanwhile, attached is an updated version of the > > current one. > > > > In the new version of the patch I also eliminated the > > ngx_quic_max_stream_flow() function and embedded its content in > > ngx_quic_stream_flush(). > > yes, this looks correct - flow limit should not consider buffer as it > was before. > > I think we should check for limit == 0 before doing read_chain and this > is good place for debug logging about 'hit MAX_DATA/MAX_STREAM_DATA' that > was removed by update.
I don't know how much do we really need those messages. What really needs to be added here is sending DATA_BLOCKED/STREAM_DATA_BLOCKED, for which I already have a separate patch. That patch also adds some logging. Once we finish with optimization, I'll send it out. Apart from logging, checking limit == 0 does not seem to make sense, because even if the limit is zero, we should still proceed, since we are still able to send fin. > > -- > > Roman Arutyunyan > > > # HG changeset patch > > # User Roman Arutyunyan <a...@nginx.com> > > # Date 1644054894 -10800 > > # Sat Feb 05 12:54:54 2022 +0300 > > # Branch quic > > # Node ID 6e1674c257709341a7508ae4bdab6f7f7d2e9284 > > # Parent 6c1dfd072859022f830aeea49db7cbe3c9f7fb55 > > QUIC: stream lingering. > > > > Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it > > can persist after connection is closed by application. During this period, > > server is expecting stream final size from client for correct flow control. > > Also, buffered output is sent to client as more flow control credit is > > granted. > > > > diff --git a/src/event/quic/ngx_event_quic.c > > b/src/event/quic/ngx_event_quic.c > > --- a/src/event/quic/ngx_event_quic.c > > +++ b/src/event/quic/ngx_event_quic.c > > @@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t > > ctp->active_connection_id_limit = 2; > > > > ngx_queue_init(&qc->streams.uninitialized); > > + ngx_queue_init(&qc->streams.free); > > > > qc->streams.recv_max_data = qc->tp.initial_max_data; > > qc->streams.recv_window = qc->streams.recv_max_data; > > diff --git a/src/event/quic/ngx_event_quic.h > > b/src/event/quic/ngx_event_quic.h > > --- a/src/event/quic/ngx_event_quic.h > > +++ b/src/event/quic/ngx_event_quic.h > > @@ -78,12 +78,14 @@ struct ngx_quic_stream_s { > > uint64_t id; > > uint64_t acked; > > uint64_t send_max_data; > > + uint64_t send_offset; > > + uint64_t send_final_size; > > uint64_t recv_max_data; > > uint64_t recv_offset; > > uint64_t recv_window; > > uint64_t recv_last; > > uint64_t recv_size; > > - uint64_t final_size; > > + uint64_t recv_final_size; > > ngx_chain_t *in; > > ngx_chain_t *out; > > ngx_uint_t cancelable; /* unsigned cancelable:1; > > */ > > diff --git a/src/event/quic/ngx_event_quic_connection.h > > b/src/event/quic/ngx_event_quic_connection.h > > --- a/src/event/quic/ngx_event_quic_connection.h > > +++ b/src/event/quic/ngx_event_quic_connection.h > > @@ -114,13 +114,16 @@ struct ngx_quic_socket_s { > > typedef struct { > > ngx_rbtree_t tree; > > ngx_rbtree_node_t sentinel; > > + > > ngx_queue_t uninitialized; > > + ngx_queue_t free; > > > > uint64_t sent; > > uint64_t recv_offset; > > uint64_t recv_window; > > uint64_t recv_last; > > uint64_t recv_max_data; > > + uint64_t send_offset; > > uint64_t send_max_data; > > > > uint64_t server_max_streams_uni; > > diff --git a/src/event/quic/ngx_event_quic_frames.c > > b/src/event/quic/ngx_event_quic_frames.c > > --- a/src/event/quic/ngx_event_quic_frames.c > > +++ b/src/event/quic/ngx_event_quic_frames.c > > @@ -391,6 +391,10 @@ ngx_quic_split_frame(ngx_connection_t *c > > return NGX_ERROR; > > } > > > > + if (f->type == NGX_QUIC_FT_STREAM) { > > + f->u.stream.fin = 0; > > + } > > + > > ngx_queue_insert_after(&f->queue, &nf->queue); > > > > return NGX_OK; > > diff --git a/src/event/quic/ngx_event_quic_streams.c > > b/src/event/quic/ngx_event_quic_streams.c > > --- a/src/event/quic/ngx_event_quic_streams.c > > +++ b/src/event/quic/ngx_event_quic_streams.c > > @@ -13,6 +13,8 @@ > > #define NGX_QUIC_STREAM_GONE (void *) -1 > > > > > > +static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, > > + ngx_uint_t err); > > static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c); > > static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c); > > static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, > > uint64_t id); > > @@ -28,11 +30,12 @@ static ssize_t ngx_quic_stream_send(ngx_ > > size_t size); > > static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c, > > ngx_chain_t *in, off_t limit); > > -static size_t ngx_quic_max_stream_flow(ngx_connection_t *c); > > +static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs); > > static void ngx_quic_stream_cleanup_handler(void *data); > > -static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last); > > -static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last); > > -static ngx_int_t ngx_quic_update_max_stream_data(ngx_connection_t *c); > > +static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs); > > +static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t > > last); > > +static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t > > last); > > +static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs); > > static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c); > > static void ngx_quic_set_event(ngx_event_t *ev); > > > > @@ -186,15 +189,20 @@ ngx_quic_close_streams(ngx_connection_t > > ns = 0; > > #endif > > > > - for (node = ngx_rbtree_min(tree->root, tree->sentinel); > > - node; > > - node = ngx_rbtree_next(tree, node)) > > - { > > + node = ngx_rbtree_min(tree->root, tree->sentinel); > > + > > + while (node) { > > qs = (ngx_quic_stream_t *) node; > > + node = ngx_rbtree_next(tree, node); > > > > qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; > > qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; > > > > + if (qs->connection == NULL) { > > + ngx_quic_close_stream(qs); > > + continue; > > + } > > + > > ngx_quic_set_event(qs->connection->read); > > ngx_quic_set_event(qs->connection->write); > > > > @@ -213,13 +221,17 @@ ngx_quic_close_streams(ngx_connection_t > > ngx_int_t > > ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err) > > { > > + return ngx_quic_do_reset_stream(c->quic, err); > > +} > > + > > + > > +static ngx_int_t > > +ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err) > > +{ > > ngx_connection_t *pc; > > ngx_quic_frame_t *frame; > > - ngx_quic_stream_t *qs; > > ngx_quic_connection_t *qc; > > > > - qs = c->quic; > > - > > if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD > > || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT > > || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD) > > @@ -228,10 +240,14 @@ ngx_quic_reset_stream(ngx_connection_t * > > } > > > > qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; > > + qs->send_final_size = qs->send_offset; > > > > pc = qs->parent; > > qc = ngx_quic_get_connection(pc); > > > > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, > > + "quic stream id:0x%xL reset", qs->id); > > + > > frame = ngx_quic_alloc_frame(pc); > > if (frame == NULL) { > > return NGX_ERROR; > > @@ -241,10 +257,13 @@ ngx_quic_reset_stream(ngx_connection_t * > > frame->type = NGX_QUIC_FT_RESET_STREAM; > > frame->u.reset_stream.id = qs->id; > > frame->u.reset_stream.error_code = err; > > - frame->u.reset_stream.final_size = c->sent; > > + frame->u.reset_stream.final_size = qs->send_offset; > > > > ngx_quic_queue_frame(qc, frame); > > > > + ngx_quic_free_chain(pc, qs->out); > > + qs->out = NULL; > > + > > return NGX_OK; > > } > > > > @@ -271,10 +290,7 @@ ngx_quic_shutdown_stream(ngx_connection_ > > static ngx_int_t > > ngx_quic_shutdown_stream_send(ngx_connection_t *c) > > { > > - ngx_connection_t *pc; > > - ngx_quic_frame_t *frame; > > - ngx_quic_stream_t *qs; > > - ngx_quic_connection_t *qc; > > + ngx_quic_stream_t *qs; > > > > qs = c->quic; > > > > @@ -284,32 +300,13 @@ ngx_quic_shutdown_stream_send(ngx_connec > > return NGX_OK; > > } > > > > - qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT; > > - > > - pc = qs->parent; > > - qc = ngx_quic_get_connection(pc); > > + qs->send_state = NGX_QUIC_STREAM_SEND_SEND; > > + qs->send_final_size = c->sent; > > > > - frame = ngx_quic_alloc_frame(pc); > > - if (frame == NULL) { > > - return NGX_ERROR; > > - } > > - > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0, > > "quic stream id:0x%xL send shutdown", qs->id); > > > > - frame->level = ssl_encryption_application; > > - frame->type = NGX_QUIC_FT_STREAM; > > - frame->u.stream.off = 1; > > - frame->u.stream.len = 1; > > - frame->u.stream.fin = 1; > > - > > - frame->u.stream.stream_id = qs->id; > > - frame->u.stream.offset = c->sent; > > - frame->u.stream.length = 0; > > - > > - ngx_quic_queue_frame(qc, frame); > > - > > - return NGX_OK; > > + return ngx_quic_stream_flush(qs); > > } > > > > > > @@ -341,7 +338,7 @@ ngx_quic_shutdown_stream_recv(ngx_connec > > return NGX_ERROR; > > } > > > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, > > "quic stream id:0x%xL recv shutdown", qs->id); > > > > frame->level = ssl_encryption_application; > > @@ -591,6 +588,7 @@ ngx_quic_create_stream(ngx_connection_t > > { > > ngx_log_t *log; > > ngx_pool_t *pool; > > + ngx_queue_t *q; > > ngx_connection_t *sc; > > ngx_quic_stream_t *qs; > > ngx_pool_cleanup_t *cln; > > @@ -601,25 +599,41 @@ ngx_quic_create_stream(ngx_connection_t > > > > qc = ngx_quic_get_connection(c); > > > > - pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log); > > - if (pool == NULL) { > > - return NULL; > > + if (!ngx_queue_empty(&qc->streams.free)) { > > + q = ngx_queue_head(&qc->streams.free); > > + qs = ngx_queue_data(q, ngx_quic_stream_t, queue); > > + ngx_queue_remove(&qs->queue); > > + > > + } else { > > + /* > > + * the number of streams is limited by transport > > + * parameters and application requirements > > + */ > > + > > + qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t)); > > + if (qs == NULL) { > > + return NULL; > > + } > > } > > > > - qs = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t)); > > - if (qs == NULL) { > > - ngx_destroy_pool(pool); > > - return NULL; > > - } > > + ngx_memzero(qs, sizeof(ngx_quic_stream_t)); > > > > qs->node.key = id; > > qs->parent = c; > > qs->id = id; > > - qs->final_size = (uint64_t) -1; > > + qs->send_final_size = (uint64_t) -1; > > + qs->recv_final_size = (uint64_t) -1; > > + > > + pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log); > > + if (pool == NULL) { > > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); > > + return NULL; > > + } > > > > log = ngx_palloc(pool, sizeof(ngx_log_t)); > > if (log == NULL) { > > ngx_destroy_pool(pool); > > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); > > return NULL; > > } > > > > @@ -629,6 +643,7 @@ ngx_quic_create_stream(ngx_connection_t > > sc = ngx_get_connection(c->fd, log); > > if (sc == NULL) { > > ngx_destroy_pool(pool); > > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); > > return NULL; > > } > > > > @@ -697,6 +712,7 @@ ngx_quic_create_stream(ngx_connection_t > > if (cln == NULL) { > > ngx_close_connection(sc); > > ngx_destroy_pool(pool); > > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); > > return NULL; > > } > > > > @@ -737,7 +753,7 @@ ngx_quic_stream_recv(ngx_connection_t *c > > return NGX_ERROR; > > } > > > > - ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, > > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, > > "quic stream id:0x%xL recv buf:%uz", qs->id, size); > > > > if (size == 0) { > > @@ -763,7 +779,7 @@ ngx_quic_stream_recv(ngx_connection_t *c > > rev->ready = 0; > > > > if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD > > - && qs->recv_offset == qs->final_size) > > + && qs->recv_offset == qs->recv_final_size) > > { > > qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ; > > } > > @@ -781,7 +797,7 @@ ngx_quic_stream_recv(ngx_connection_t *c > > ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, > > "quic stream id:0x%xL recv len:%z", qs->id, len); > > > > - if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) { > > + if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) { > > return NGX_ERROR; > > } > > > > @@ -822,9 +838,7 @@ ngx_quic_stream_send_chain(ngx_connectio > > off_t flow; > > size_t n; > > ngx_event_t *wev; > > - ngx_chain_t *out; > > ngx_connection_t *pc; > > - ngx_quic_frame_t *frame; > > ngx_quic_stream_t *qs; > > ngx_quic_connection_t *qc; > > > > @@ -842,7 +856,8 @@ ngx_quic_stream_send_chain(ngx_connectio > > > > qs->send_state = NGX_QUIC_STREAM_SEND_SEND; > > > > - flow = ngx_quic_max_stream_flow(c); > > + flow = qs->acked + qc->conf->stream_buffer_size - c->sent; > > + > > if (flow == 0) { > > wev->ready = 0; > > return in; > > @@ -852,37 +867,15 @@ ngx_quic_stream_send_chain(ngx_connectio > > limit = flow; > > } > > > > - in = ngx_quic_write_chain(pc, &qs->out, in, limit, 0, &n); > > + in = ngx_quic_write_chain(pc, &qs->out, in, limit, > > + c->sent - qs->send_offset, &n); > > if (in == NGX_CHAIN_ERROR) { > > return NGX_CHAIN_ERROR; > > } > > > > - out = ngx_quic_read_chain(pc, &qs->out, n); > > - if (out == NGX_CHAIN_ERROR) { > > - return NGX_CHAIN_ERROR; > > - } > > - > > - frame = ngx_quic_alloc_frame(pc); > > - if (frame == NULL) { > > - return NGX_CHAIN_ERROR; > > - } > > - > > - frame->level = ssl_encryption_application; > > - frame->type = NGX_QUIC_FT_STREAM; > > - frame->data = out; > > - frame->u.stream.off = 1; > > - frame->u.stream.len = 1; > > - frame->u.stream.fin = 0; > > - > > - frame->u.stream.stream_id = qs->id; > > - frame->u.stream.offset = c->sent; > > - frame->u.stream.length = n; > > - > > c->sent += n; > > qc->streams.sent += n; > > > > - ngx_quic_queue_frame(qc, frame); > > - > > if (in) { > > wev->ready = 0; > > } > > @@ -890,61 +883,96 @@ ngx_quic_stream_send_chain(ngx_connectio > > ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > > "quic send_chain sent:%uz", n); > > > > + if (ngx_quic_stream_flush(qs) != NGX_OK) { > > + return NGX_CHAIN_ERROR; > > + } > > + > > return in; > > } > > > > > > -static size_t > > -ngx_quic_max_stream_flow(ngx_connection_t *c) > > +static ngx_int_t > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs) > > { > > - size_t size; > > - uint64_t sent, unacked; > > - ngx_quic_stream_t *qs; > > + off_t limit; > > + size_t len; > > + ngx_uint_t last; > > + ngx_chain_t *out, *cl; > > + ngx_quic_frame_t *frame; > > + ngx_connection_t *pc; > > ngx_quic_connection_t *qc; > > > > - qs = c->quic; > > - qc = ngx_quic_get_connection(qs->parent); > > + if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) { > > + return NGX_OK; > > + } > > > > - size = qc->conf->stream_buffer_size; > > - sent = c->sent; > > - unacked = sent - qs->acked; > > + pc = qs->parent; > > + qc = ngx_quic_get_connection(pc); > > > > if (qc->streams.send_max_data == 0) { > > qc->streams.send_max_data = qc->ctp.initial_max_data; > > } > > > > - if (unacked >= size) { > > - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, > > - "quic send flow hit buffer size"); > > - return 0; > > + limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset, > > + qs->send_max_data - qs->send_offset); > > + > > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, > > + "quic stream id:0x%xL flush limit:%O", qs->id, limit); > > + > > + out = ngx_quic_read_chain(pc, &qs->out, limit); > > + if (out == NGX_CHAIN_ERROR) { > > + return NGX_ERROR; > > } > > > > - size -= unacked; > > + len = 0; > > + last = 0; > > + > > + for (cl = out; cl; cl = cl->next) { > > + len += cl->buf->last - cl->buf->pos; > > + } > > > > - if (qc->streams.sent >= qc->streams.send_max_data) { > > - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, > > - "quic send flow hit MAX_DATA"); > > - return 0; > > + if (qs->send_final_size != (uint64_t) -1 > > + && qs->send_final_size == qs->send_offset + len) > > + { > > + qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT; > > + last = 1; > > + } > > + > > + if (len == 0 && !last) { > > + return NGX_OK; > > } > > > > - if (qc->streams.sent + size > qc->streams.send_max_data) { > > - size = qc->streams.send_max_data - qc->streams.sent; > > + frame = ngx_quic_alloc_frame(pc); > > + if (frame == NULL) { > > + return NGX_ERROR; > > } > > > > - if (sent >= qs->send_max_data) { > > - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, > > - "quic send flow hit MAX_STREAM_DATA"); > > - return 0; > > + frame->level = ssl_encryption_application; > > + frame->type = NGX_QUIC_FT_STREAM; > > + frame->data = out; > > + > > + frame->u.stream.off = 1; > > + frame->u.stream.len = 1; > > + frame->u.stream.fin = last; > > + > > + frame->u.stream.stream_id = qs->id; > > + frame->u.stream.offset = qs->send_offset; > > + frame->u.stream.length = len; > > + > > + ngx_quic_queue_frame(qc, frame); > > + > > + qs->send_offset += len; > > + qc->streams.send_offset += len; > > + > > + ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0, > > + "quic stream id:0x%xL flush len:%uz last:%ui", > > + qs->id, len, last); > > + > > + if (qs->connection == NULL) { > > + return ngx_quic_close_stream(qs); > > } > > > > - if (sent + size > qs->send_max_data) { > > - size = qs->send_max_data - sent; > > - } > > - > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > > - "quic send flow:%uz", size); > > - > > - return size; > > + return NGX_OK; > > } > > > > > > @@ -953,40 +981,67 @@ ngx_quic_stream_cleanup_handler(void *da > > { > > ngx_connection_t *c = data; > > > > + ngx_quic_stream_t *qs; > > + > > + qs = c->quic; > > + > > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0, > > + "quic stream id:0x%xL cleanup", qs->id); > > + > > + if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) { > > + ngx_quic_close_connection(c, NGX_ERROR); > > + return; > > + } > > + > > + qs->connection = NULL; > > + > > + if (ngx_quic_close_stream(qs) != NGX_OK) { > > + ngx_quic_close_connection(c, NGX_ERROR); > > + return; > > + } > > +} > > + > > + > > +static ngx_int_t > > +ngx_quic_close_stream(ngx_quic_stream_t *qs) > > +{ > > ngx_connection_t *pc; > > ngx_quic_frame_t *frame; > > - ngx_quic_stream_t *qs; > > ngx_quic_connection_t *qc; > > > > - qs = c->quic; > > pc = qs->parent; > > qc = ngx_quic_get_connection(pc); > > > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > > - "quic stream id:0x%xL cleanup", qs->id); > > + if (!qc->closing) { > > + /* make sure everything is sent and final size is received */ > > + > > + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV > > + || qs->send_state == NGX_QUIC_STREAM_SEND_READY > > + || qs->send_state == NGX_QUIC_STREAM_SEND_SEND) > > + { > > + return NGX_OK; > > + } > > + } > > + > > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, > > + "quic stream id:0x%xL close", qs->id); > > + > > + ngx_quic_free_chain(pc, qs->in); > > + ngx_quic_free_chain(pc, qs->out); > > > > ngx_rbtree_delete(&qc->streams.tree, &qs->node); > > - ngx_quic_free_chain(pc, qs->in); > > - ngx_quic_free_chain(pc, qs->out); > > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); > > > > if (qc->closing) { > > /* schedule handler call to continue ngx_quic_close_connection() */ > > ngx_post_event(pc->read, &ngx_posted_events); > > - return; > > + return NGX_OK; > > } > > > > - if (qc->error) { > > - goto done; > > - } > > - > > - (void) ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN); > > - > > - (void) ngx_quic_update_flow(c, qs->recv_last); > > - > > if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) { > > frame = ngx_quic_alloc_frame(pc); > > if (frame == NULL) { > > - goto done; > > + return NGX_ERROR; > > } > > > > frame->level = ssl_encryption_application; > > @@ -1004,13 +1059,11 @@ ngx_quic_stream_cleanup_handler(void *da > > ngx_quic_queue_frame(qc, frame); > > } > > > > -done: > > - > > - (void) ngx_quic_output(pc); > > - > > if (qc->shutdown) { > > ngx_post_event(pc->read, &ngx_posted_events); > > } > > + > > + return NGX_OK; > > } > > > > > > @@ -1020,7 +1073,6 @@ ngx_quic_handle_stream_frame(ngx_connect > > { > > size_t size; > > uint64_t last; > > - ngx_connection_t *sc; > > ngx_quic_stream_t *qs; > > ngx_quic_connection_t *qc; > > ngx_quic_stream_frame_t *f; > > @@ -1048,19 +1100,17 @@ ngx_quic_handle_stream_frame(ngx_connect > > return NGX_OK; > > } > > > > - sc = qs->connection; > > - > > if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV > > && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN) > > { > > return NGX_OK; > > } > > > > - if (ngx_quic_control_flow(sc, last) != NGX_OK) { > > + if (ngx_quic_control_flow(qs, last) != NGX_OK) { > > return NGX_ERROR; > > } > > > > - if (qs->final_size != (uint64_t) -1 && last > qs->final_size) { > > + if (qs->recv_final_size != (uint64_t) -1 && last > > > qs->recv_final_size) { > > qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; > > return NGX_ERROR; > > } > > @@ -1075,7 +1125,8 @@ ngx_quic_handle_stream_frame(ngx_connect > > } > > > > if (f->fin) { > > - if (qs->final_size != (uint64_t) -1 && qs->final_size != last) { > > + if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != > > last) > > + { > > qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; > > return NGX_ERROR; > > } > > @@ -1085,7 +1136,7 @@ ngx_quic_handle_stream_frame(ngx_connect > > return NGX_ERROR; > > } > > > > - qs->final_size = last; > > + qs->recv_final_size = last; > > qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN; > > } > > > > @@ -1099,13 +1150,17 @@ ngx_quic_handle_stream_frame(ngx_connect > > qs->recv_size += size; > > > > if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN > > - && qs->recv_size == qs->final_size) > > + && qs->recv_size == qs->recv_final_size) > > { > > qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD; > > } > > > > + if (qs->connection == NULL) { > > + return ngx_quic_close_stream(qs); > > + } > > + > > if (f->offset == qs->recv_offset) { > > - ngx_quic_set_event(sc->read); > > + ngx_quic_set_event(qs->connection->read); > > } > > > > return NGX_OK; > > @@ -1128,20 +1183,26 @@ ngx_quic_handle_max_data_frame(ngx_conne > > return NGX_OK; > > } > > > > - if (tree->root != tree->sentinel > > - && qc->streams.sent >= qc->streams.send_max_data) > > + if (tree->root == tree->sentinel > > + || qc->streams.send_offset < qc->streams.send_max_data) > > { > > - > > - for (node = ngx_rbtree_min(tree->root, tree->sentinel); > > - node; > > - node = ngx_rbtree_next(tree, node)) > > - { > > - qs = (ngx_quic_stream_t *) node; > > - ngx_quic_set_event(qs->connection->write); > > - } > > + /* not blocked on MAX_DATA */ > > + qc->streams.send_max_data = f->max_data; > > + return NGX_OK; > > } > > > > qc->streams.send_max_data = f->max_data; > > + node = ngx_rbtree_min(tree->root, tree->sentinel); > > + > > + while (node && qc->streams.send_offset < qc->streams.send_max_data) { > > + > > + qs = (ngx_quic_stream_t *) node; > > + node = ngx_rbtree_next(tree, node); > > + > > + if (ngx_quic_stream_flush(qs) != NGX_OK) { > > + return NGX_ERROR; > > + } > > + } > > > > return NGX_OK; > > } > > @@ -1189,7 +1250,7 @@ ngx_quic_handle_stream_data_blocked_fram > > return NGX_OK; > > } > > > > - return ngx_quic_update_max_stream_data(qs->connection); > > + return ngx_quic_update_max_stream_data(qs); > > } > > > > > > @@ -1197,7 +1258,6 @@ ngx_int_t > > ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c, > > ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f) > > { > > - uint64_t sent; > > ngx_quic_stream_t *qs; > > ngx_quic_connection_t *qc; > > > > @@ -1224,15 +1284,15 @@ ngx_quic_handle_max_stream_data_frame(ng > > return NGX_OK; > > } > > > > - sent = qs->connection->sent; > > - > > - if (sent >= qs->send_max_data) { > > - ngx_quic_set_event(qs->connection->write); > > + if (qs->send_offset < qs->send_max_data) { > > + /* not blocked on MAX_STREAM_DATA */ > > + qs->send_max_data = f->limit; > > + return NGX_OK; > > } > > > > qs->send_max_data = f->limit; > > > > - return NGX_OK; > > + return ngx_quic_stream_flush(qs); > > } > > > > > > @@ -1240,7 +1300,6 @@ ngx_int_t > > ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, > > ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f) > > { > > - ngx_connection_t *sc; > > ngx_quic_stream_t *qs; > > ngx_quic_connection_t *qc; > > > > @@ -1271,13 +1330,13 @@ ngx_quic_handle_reset_stream_frame(ngx_c > > > > qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; > > > > - sc = qs->connection; > > - > > - if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) { > > + if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) { > > return NGX_ERROR; > > } > > > > - if (qs->final_size != (uint64_t) -1 && qs->final_size != > > f->final_size) { > > + if (qs->recv_final_size != (uint64_t) -1 > > + && qs->recv_final_size != f->final_size) > > + { > > qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; > > return NGX_ERROR; > > } > > @@ -1287,12 +1346,16 @@ ngx_quic_handle_reset_stream_frame(ngx_c > > return NGX_ERROR; > > } > > > > - qs->final_size = f->final_size; > > + qs->recv_final_size = f->final_size; > > > > - if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) { > > + if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) { > > return NGX_ERROR; > > } > > > > + if (qs->connection == NULL) { > > + return ngx_quic_close_stream(qs); > > + } > > + > > ngx_quic_set_event(qs->connection->read); > > > > return NGX_OK; > > @@ -1325,10 +1388,14 @@ ngx_quic_handle_stop_sending_frame(ngx_c > > return NGX_OK; > > } > > > > - if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) { > > + if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) { > > return NGX_ERROR; > > } > > > > + if (qs->connection == NULL) { > > + return ngx_quic_close_stream(qs); > > + } > > + > > ngx_quic_set_event(qs->connection->write); > > > > return NGX_OK; > > @@ -1378,30 +1445,37 @@ ngx_quic_handle_stream_ack(ngx_connectio > > return; > > } > > > > + if (qs->connection == NULL) { > > + qs->acked += f->u.stream.length; > > + return; > > + } > > + > > sent = qs->connection->sent; > > unacked = sent - qs->acked; > > + qs->acked += f->u.stream.length; > > > > - if (unacked >= qc->conf->stream_buffer_size) { > > - ngx_quic_set_event(qs->connection->write); > > + ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, > > + "quic stream id:0x%xL ack len:%uL acked:%uL > > unacked:%uL", > > + qs->id, f->u.stream.length, qs->acked, sent - > > qs->acked); > > + > > + if (unacked != qc->conf->stream_buffer_size) { > > + /* not blocked on buffer size */ > > + return; > > } > > > > - qs->acked += f->u.stream.length; > > - > > - ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0, > > - "quic stream ack len:%uL acked:%uL unacked:%uL", > > - f->u.stream.length, qs->acked, sent - qs->acked); > > + ngx_quic_set_event(qs->connection->write); > > } > > > > > > static ngx_int_t > > -ngx_quic_control_flow(ngx_connection_t *c, uint64_t last) > > +ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last) > > { > > uint64_t len; > > - ngx_quic_stream_t *qs; > > + ngx_connection_t *pc; > > ngx_quic_connection_t *qc; > > > > - qs = c->quic; > > - qc = ngx_quic_get_connection(qs->parent); > > + pc = qs->parent; > > + qc = ngx_quic_get_connection(pc); > > > > if (last <= qs->recv_last) { > > return NGX_OK; > > @@ -1409,9 +1483,9 @@ ngx_quic_control_flow(ngx_connection_t * > > > > len = last - qs->recv_last; > > > > - ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, > > - "quic flow control msd:%uL/%uL md:%uL/%uL", > > - last, qs->recv_max_data, qc->streams.recv_last + len, > > + ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0, > > + "quic stream id:0x%xL flow control msd:%uL/%uL > > md:%uL/%uL", > > + qs->id, last, qs->recv_max_data, qc->streams.recv_last > > + len, > > qc->streams.recv_max_data); > > > > qs->recv_last += len; > > @@ -1435,14 +1509,12 @@ ngx_quic_control_flow(ngx_connection_t * > > > > > > static ngx_int_t > > -ngx_quic_update_flow(ngx_connection_t *c, uint64_t last) > > +ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last) > > { > > uint64_t len; > > ngx_connection_t *pc; > > - ngx_quic_stream_t *qs; > > ngx_quic_connection_t *qc; > > > > - qs = c->quic; > > pc = qs->parent; > > qc = ngx_quic_get_connection(pc); > > > > @@ -1452,13 +1524,13 @@ ngx_quic_update_flow(ngx_connection_t *c > > > > len = last - qs->recv_offset; > > > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > > - "quic flow update %uL", last); > > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, > > + "quic stream id:0x%xL flow update %uL", qs->id, last); > > > > qs->recv_offset += len; > > > > if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) { > > - if (ngx_quic_update_max_stream_data(c) != NGX_OK) { > > + if (ngx_quic_update_max_stream_data(qs) != NGX_OK) { > > return NGX_ERROR; > > } > > } > > @@ -1478,15 +1550,13 @@ ngx_quic_update_flow(ngx_connection_t *c > > > > > > static ngx_int_t > > -ngx_quic_update_max_stream_data(ngx_connection_t *c) > > +ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs) > > { > > uint64_t recv_max_data; > > ngx_connection_t *pc; > > ngx_quic_frame_t *frame; > > - ngx_quic_stream_t *qs; > > ngx_quic_connection_t *qc; > > > > - qs = c->quic; > > pc = qs->parent; > > qc = ngx_quic_get_connection(pc); > > > > @@ -1502,8 +1572,9 @@ ngx_quic_update_max_stream_data(ngx_conn > > > > qs->recv_max_data = recv_max_data; > > > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > > - "quic flow update msd:%uL", qs->recv_max_data); > > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, > > + "quic stream id:0x%xL flow update msd:%uL", > > + qs->id, qs->recv_max_data); > > > > frame = ngx_quic_alloc_frame(pc); > > if (frame == NULL) { > > diff --git a/src/http/v3/ngx_http_v3_uni.c b/src/http/v3/ngx_http_v3_uni.c > > --- a/src/http/v3/ngx_http_v3_uni.c > > +++ b/src/http/v3/ngx_http_v3_uni.c > > @@ -295,8 +295,6 @@ ngx_http_v3_uni_dummy_write_handler(ngx_ > > } > > > > > > -/* XXX async & buffered stream writes */ > > - > > ngx_connection_t * > > ngx_http_v3_create_push_stream(ngx_connection_t *c, uint64_t push_id) > > { > > > _______________________________________________ > > nginx-devel mailing list -- nginx-devel@nginx.org > > To unsubscribe send an email to nginx-devel-le...@nginx.org > > _______________________________________________ > nginx-devel mailing list -- nginx-devel@nginx.org > To unsubscribe send an email to nginx-devel-le...@nginx.org -- Roman Arutyunyan _______________________________________________ nginx-devel mailing list -- nginx-devel@nginx.org To unsubscribe send an email to nginx-devel-le...@nginx.org