On Mon, Feb 07, 2022 at 05:16:17PM +0300, Roman Arutyunyan wrote: > Hi, > > On Fri, Feb 04, 2022 at 04:56:23PM +0300, Vladimir Homutov wrote: > > On Tue, Feb 01, 2022 at 04:39:59PM +0300, Roman Arutyunyan wrote: > > > # HG changeset patch > > > # User Roman Arutyunyan <a...@nginx.com> > > > # Date 1643722727 -10800 > > > # Tue Feb 01 16:38:47 2022 +0300 > > > # Branch quic > > > # Node ID db31ae16c1f2050be9c9f6b1f117ab6725b97dd4 > > > # Parent 308ac307b3e6952ef0c5ccf10cc82904c59fa4c3 > > > QUIC: stream lingering. > > > > > > Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it > > > can persist after connection is closed by application. During this > > > period, > > > server is expecting stream final size from client for correct flow > > > control. > > > Also, buffered output is sent to client as more flow control credit is > > > granted. > > > > > [..] > > > > > +static ngx_int_t > > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs) > > > +{ > > > + size_t limit, len; > > > + ngx_uint_t last; > > > + ngx_chain_t *out, *cl; > > > + ngx_quic_frame_t *frame; > > > + ngx_connection_t *pc; > > > + ngx_quic_connection_t *qc; > > > + > > > + if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) { > > > + return NGX_OK; > > > + } > > > + > > > + pc = qs->parent; > > > + qc = ngx_quic_get_connection(pc); > > > + > > > + limit = ngx_quic_max_stream_flow(qs); > > > + last = 0; > > > + > > > + out = ngx_quic_read_chain(pc, &qs->out, limit); > > > + if (out == NGX_CHAIN_ERROR) { > > > + return NGX_ERROR; > > > + } > > > + > > > + len = 0; > > > + last = 0; > > > > this assignment looks duplicate. > > Thanks, fixed. > > > [..] > > > > > +static ngx_int_t > > > +ngx_quic_close_stream(ngx_quic_stream_t *qs) > > > +{ > > > ngx_connection_t *pc; > > > ngx_quic_frame_t *frame; > > > - ngx_quic_stream_t *qs; > > > ngx_quic_connection_t *qc; > > > > > > - qs = c->quic; > > > pc = qs->parent; > > > qc = ngx_quic_get_connection(pc); > > > > > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > > > - "quic stream id:0x%xL cleanup", qs->id); > > > + if (!qc->closing) { > > > + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV > > > + || qs->send_state == NGX_QUIC_STREAM_SEND_READY > > > + || qs->send_state == NGX_QUIC_STREAM_SEND_SEND) > > > + { > > > > so basically this are the states where we need to wait for FIN? > > and thus avoid closing till we get it. > > I would add a comment here. > > On the receiving end we wait either for fin or for reset to have final size. > On the sending end we wait for everything that's buffered to be sent. > Added a comment about that. > > > [..] > > > + if (qs->connection == NULL) { > > > + return ngx_quic_close_stream(qs); > > > + } > > > + > > > ngx_quic_set_event(qs->connection->write); > > > > this pattern - check connection, close if NULL and set event seem to > > repeat. Maybe it's worth to try to put this check/action into > > ngx_quic_set_event somehow ? we could instead have > > set_read_event/set_write_event maybe. > > I thought about this too, but it's not always that simple. And even if it > was, > the new function/macro would have unclear semantics. Let's just remember this > as a possible future optimiation. > > > > +static ngx_int_t > > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs) > > > + > > [..] > > > + if (len == 0 && !last) { > > > + return NGX_OK; > > > + } > > > + > > > + frame = ngx_quic_alloc_frame(pc); > > > + if (frame == NULL) { > > > + return NGX_ERROR; > > > + } > > > + > > > + frame = ngx_quic_alloc_frame(pc); > > > + if (frame == NULL) { > > > + return NGX_ERROR; > > > + } > > > > one more dup here. > > Yes, thanks. > > > Overal, it looks good, but the testing revealed another issue: with big > > buffer sizes we run into issue of too long chains in ngx_quic_write_chain(). > > As discussed, this certainly needs optimization - probably adding some > > pointer to the end to facilitate appending, or something else. > > It's true ngx_quic_write_chain() needs to be optimized. When the buffered > chain is big, it takes too much time to find the write point. I'll address > this is a separate patch. Meanwhile, attached is an updated version of the > current one. > > In the new version of the patch I also eliminated the > ngx_quic_max_stream_flow() function and embedded its content in > ngx_quic_stream_flush().
yes, this looks correct - flow limit should not consider buffer as it was before. I think we should check for limit == 0 before doing read_chain and this is good place for debug logging about 'hit MAX_DATA/MAX_STREAM_DATA' that was removed by update. > > -- > Roman Arutyunyan > # HG changeset patch > # User Roman Arutyunyan <a...@nginx.com> > # Date 1644054894 -10800 > # Sat Feb 05 12:54:54 2022 +0300 > # Branch quic > # Node ID 6e1674c257709341a7508ae4bdab6f7f7d2e9284 > # Parent 6c1dfd072859022f830aeea49db7cbe3c9f7fb55 > QUIC: stream lingering. > > Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it > can persist after connection is closed by application. During this period, > server is expecting stream final size from client for correct flow control. > Also, buffered output is sent to client as more flow control credit is > granted. > > diff --git a/src/event/quic/ngx_event_quic.c b/src/event/quic/ngx_event_quic.c > --- a/src/event/quic/ngx_event_quic.c > +++ b/src/event/quic/ngx_event_quic.c > @@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t > ctp->active_connection_id_limit = 2; > > ngx_queue_init(&qc->streams.uninitialized); > + ngx_queue_init(&qc->streams.free); > > qc->streams.recv_max_data = qc->tp.initial_max_data; > qc->streams.recv_window = qc->streams.recv_max_data; > diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h > --- a/src/event/quic/ngx_event_quic.h > +++ b/src/event/quic/ngx_event_quic.h > @@ -78,12 +78,14 @@ struct ngx_quic_stream_s { > uint64_t id; > uint64_t acked; > uint64_t send_max_data; > + uint64_t send_offset; > + uint64_t send_final_size; > uint64_t recv_max_data; > uint64_t recv_offset; > uint64_t recv_window; > uint64_t recv_last; > uint64_t recv_size; > - uint64_t final_size; > + uint64_t recv_final_size; > ngx_chain_t *in; > ngx_chain_t *out; > ngx_uint_t cancelable; /* unsigned cancelable:1; */ > diff --git a/src/event/quic/ngx_event_quic_connection.h > b/src/event/quic/ngx_event_quic_connection.h > --- a/src/event/quic/ngx_event_quic_connection.h > +++ b/src/event/quic/ngx_event_quic_connection.h > @@ -114,13 +114,16 @@ struct ngx_quic_socket_s { > typedef struct { > ngx_rbtree_t tree; > ngx_rbtree_node_t sentinel; > + > ngx_queue_t uninitialized; > + ngx_queue_t free; > > uint64_t sent; > uint64_t recv_offset; > uint64_t recv_window; > uint64_t recv_last; > uint64_t recv_max_data; > + uint64_t send_offset; > uint64_t send_max_data; > > uint64_t server_max_streams_uni; > diff --git a/src/event/quic/ngx_event_quic_frames.c > b/src/event/quic/ngx_event_quic_frames.c > --- a/src/event/quic/ngx_event_quic_frames.c > +++ b/src/event/quic/ngx_event_quic_frames.c > @@ -391,6 +391,10 @@ ngx_quic_split_frame(ngx_connection_t *c > return NGX_ERROR; > } > > + if (f->type == NGX_QUIC_FT_STREAM) { > + f->u.stream.fin = 0; > + } > + > ngx_queue_insert_after(&f->queue, &nf->queue); > > return NGX_OK; > diff --git a/src/event/quic/ngx_event_quic_streams.c > b/src/event/quic/ngx_event_quic_streams.c > --- a/src/event/quic/ngx_event_quic_streams.c > +++ b/src/event/quic/ngx_event_quic_streams.c > @@ -13,6 +13,8 @@ > #define NGX_QUIC_STREAM_GONE (void *) -1 > > > +static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, > + ngx_uint_t err); > static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c); > static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c); > static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t > id); > @@ -28,11 +30,12 @@ static ssize_t ngx_quic_stream_send(ngx_ > size_t size); > static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c, > ngx_chain_t *in, off_t limit); > -static size_t ngx_quic_max_stream_flow(ngx_connection_t *c); > +static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs); > static void ngx_quic_stream_cleanup_handler(void *data); > -static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last); > -static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last); > -static ngx_int_t ngx_quic_update_max_stream_data(ngx_connection_t *c); > +static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs); > +static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last); > +static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last); > +static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs); > static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c); > static void ngx_quic_set_event(ngx_event_t *ev); > > @@ -186,15 +189,20 @@ ngx_quic_close_streams(ngx_connection_t > ns = 0; > #endif > > - for (node = ngx_rbtree_min(tree->root, tree->sentinel); > - node; > - node = ngx_rbtree_next(tree, node)) > - { > + node = ngx_rbtree_min(tree->root, tree->sentinel); > + > + while (node) { > qs = (ngx_quic_stream_t *) node; > + node = ngx_rbtree_next(tree, node); > > qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; > qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; > > + if (qs->connection == NULL) { > + ngx_quic_close_stream(qs); > + continue; > + } > + > ngx_quic_set_event(qs->connection->read); > ngx_quic_set_event(qs->connection->write); > > @@ -213,13 +221,17 @@ ngx_quic_close_streams(ngx_connection_t > ngx_int_t > ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err) > { > + return ngx_quic_do_reset_stream(c->quic, err); > +} > + > + > +static ngx_int_t > +ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err) > +{ > ngx_connection_t *pc; > ngx_quic_frame_t *frame; > - ngx_quic_stream_t *qs; > ngx_quic_connection_t *qc; > > - qs = c->quic; > - > if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD > || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT > || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD) > @@ -228,10 +240,14 @@ ngx_quic_reset_stream(ngx_connection_t * > } > > qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; > + qs->send_final_size = qs->send_offset; > > pc = qs->parent; > qc = ngx_quic_get_connection(pc); > > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, > + "quic stream id:0x%xL reset", qs->id); > + > frame = ngx_quic_alloc_frame(pc); > if (frame == NULL) { > return NGX_ERROR; > @@ -241,10 +257,13 @@ ngx_quic_reset_stream(ngx_connection_t * > frame->type = NGX_QUIC_FT_RESET_STREAM; > frame->u.reset_stream.id = qs->id; > frame->u.reset_stream.error_code = err; > - frame->u.reset_stream.final_size = c->sent; > + frame->u.reset_stream.final_size = qs->send_offset; > > ngx_quic_queue_frame(qc, frame); > > + ngx_quic_free_chain(pc, qs->out); > + qs->out = NULL; > + > return NGX_OK; > } > > @@ -271,10 +290,7 @@ ngx_quic_shutdown_stream(ngx_connection_ > static ngx_int_t > ngx_quic_shutdown_stream_send(ngx_connection_t *c) > { > - ngx_connection_t *pc; > - ngx_quic_frame_t *frame; > - ngx_quic_stream_t *qs; > - ngx_quic_connection_t *qc; > + ngx_quic_stream_t *qs; > > qs = c->quic; > > @@ -284,32 +300,13 @@ ngx_quic_shutdown_stream_send(ngx_connec > return NGX_OK; > } > > - qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT; > - > - pc = qs->parent; > - qc = ngx_quic_get_connection(pc); > + qs->send_state = NGX_QUIC_STREAM_SEND_SEND; > + qs->send_final_size = c->sent; > > - frame = ngx_quic_alloc_frame(pc); > - if (frame == NULL) { > - return NGX_ERROR; > - } > - > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0, > "quic stream id:0x%xL send shutdown", qs->id); > > - frame->level = ssl_encryption_application; > - frame->type = NGX_QUIC_FT_STREAM; > - frame->u.stream.off = 1; > - frame->u.stream.len = 1; > - frame->u.stream.fin = 1; > - > - frame->u.stream.stream_id = qs->id; > - frame->u.stream.offset = c->sent; > - frame->u.stream.length = 0; > - > - ngx_quic_queue_frame(qc, frame); > - > - return NGX_OK; > + return ngx_quic_stream_flush(qs); > } > > > @@ -341,7 +338,7 @@ ngx_quic_shutdown_stream_recv(ngx_connec > return NGX_ERROR; > } > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, > "quic stream id:0x%xL recv shutdown", qs->id); > > frame->level = ssl_encryption_application; > @@ -591,6 +588,7 @@ ngx_quic_create_stream(ngx_connection_t > { > ngx_log_t *log; > ngx_pool_t *pool; > + ngx_queue_t *q; > ngx_connection_t *sc; > ngx_quic_stream_t *qs; > ngx_pool_cleanup_t *cln; > @@ -601,25 +599,41 @@ ngx_quic_create_stream(ngx_connection_t > > qc = ngx_quic_get_connection(c); > > - pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log); > - if (pool == NULL) { > - return NULL; > + if (!ngx_queue_empty(&qc->streams.free)) { > + q = ngx_queue_head(&qc->streams.free); > + qs = ngx_queue_data(q, ngx_quic_stream_t, queue); > + ngx_queue_remove(&qs->queue); > + > + } else { > + /* > + * the number of streams is limited by transport > + * parameters and application requirements > + */ > + > + qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t)); > + if (qs == NULL) { > + return NULL; > + } > } > > - qs = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t)); > - if (qs == NULL) { > - ngx_destroy_pool(pool); > - return NULL; > - } > + ngx_memzero(qs, sizeof(ngx_quic_stream_t)); > > qs->node.key = id; > qs->parent = c; > qs->id = id; > - qs->final_size = (uint64_t) -1; > + qs->send_final_size = (uint64_t) -1; > + qs->recv_final_size = (uint64_t) -1; > + > + pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log); > + if (pool == NULL) { > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); > + return NULL; > + } > > log = ngx_palloc(pool, sizeof(ngx_log_t)); > if (log == NULL) { > ngx_destroy_pool(pool); > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); > return NULL; > } > > @@ -629,6 +643,7 @@ ngx_quic_create_stream(ngx_connection_t > sc = ngx_get_connection(c->fd, log); > if (sc == NULL) { > ngx_destroy_pool(pool); > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); > return NULL; > } > > @@ -697,6 +712,7 @@ ngx_quic_create_stream(ngx_connection_t > if (cln == NULL) { > ngx_close_connection(sc); > ngx_destroy_pool(pool); > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); > return NULL; > } > > @@ -737,7 +753,7 @@ ngx_quic_stream_recv(ngx_connection_t *c > return NGX_ERROR; > } > > - ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, > "quic stream id:0x%xL recv buf:%uz", qs->id, size); > > if (size == 0) { > @@ -763,7 +779,7 @@ ngx_quic_stream_recv(ngx_connection_t *c > rev->ready = 0; > > if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD > - && qs->recv_offset == qs->final_size) > + && qs->recv_offset == qs->recv_final_size) > { > qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ; > } > @@ -781,7 +797,7 @@ ngx_quic_stream_recv(ngx_connection_t *c > ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, > "quic stream id:0x%xL recv len:%z", qs->id, len); > > - if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) { > + if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) { > return NGX_ERROR; > } > > @@ -822,9 +838,7 @@ ngx_quic_stream_send_chain(ngx_connectio > off_t flow; > size_t n; > ngx_event_t *wev; > - ngx_chain_t *out; > ngx_connection_t *pc; > - ngx_quic_frame_t *frame; > ngx_quic_stream_t *qs; > ngx_quic_connection_t *qc; > > @@ -842,7 +856,8 @@ ngx_quic_stream_send_chain(ngx_connectio > > qs->send_state = NGX_QUIC_STREAM_SEND_SEND; > > - flow = ngx_quic_max_stream_flow(c); > + flow = qs->acked + qc->conf->stream_buffer_size - c->sent; > + > if (flow == 0) { > wev->ready = 0; > return in; > @@ -852,37 +867,15 @@ ngx_quic_stream_send_chain(ngx_connectio > limit = flow; > } > > - in = ngx_quic_write_chain(pc, &qs->out, in, limit, 0, &n); > + in = ngx_quic_write_chain(pc, &qs->out, in, limit, > + c->sent - qs->send_offset, &n); > if (in == NGX_CHAIN_ERROR) { > return NGX_CHAIN_ERROR; > } > > - out = ngx_quic_read_chain(pc, &qs->out, n); > - if (out == NGX_CHAIN_ERROR) { > - return NGX_CHAIN_ERROR; > - } > - > - frame = ngx_quic_alloc_frame(pc); > - if (frame == NULL) { > - return NGX_CHAIN_ERROR; > - } > - > - frame->level = ssl_encryption_application; > - frame->type = NGX_QUIC_FT_STREAM; > - frame->data = out; > - frame->u.stream.off = 1; > - frame->u.stream.len = 1; > - frame->u.stream.fin = 0; > - > - frame->u.stream.stream_id = qs->id; > - frame->u.stream.offset = c->sent; > - frame->u.stream.length = n; > - > c->sent += n; > qc->streams.sent += n; > > - ngx_quic_queue_frame(qc, frame); > - > if (in) { > wev->ready = 0; > } > @@ -890,61 +883,96 @@ ngx_quic_stream_send_chain(ngx_connectio > ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > "quic send_chain sent:%uz", n); > > + if (ngx_quic_stream_flush(qs) != NGX_OK) { > + return NGX_CHAIN_ERROR; > + } > + > return in; > } > > > -static size_t > -ngx_quic_max_stream_flow(ngx_connection_t *c) > +static ngx_int_t > +ngx_quic_stream_flush(ngx_quic_stream_t *qs) > { > - size_t size; > - uint64_t sent, unacked; > - ngx_quic_stream_t *qs; > + off_t limit; > + size_t len; > + ngx_uint_t last; > + ngx_chain_t *out, *cl; > + ngx_quic_frame_t *frame; > + ngx_connection_t *pc; > ngx_quic_connection_t *qc; > > - qs = c->quic; > - qc = ngx_quic_get_connection(qs->parent); > + if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) { > + return NGX_OK; > + } > > - size = qc->conf->stream_buffer_size; > - sent = c->sent; > - unacked = sent - qs->acked; > + pc = qs->parent; > + qc = ngx_quic_get_connection(pc); > > if (qc->streams.send_max_data == 0) { > qc->streams.send_max_data = qc->ctp.initial_max_data; > } > > - if (unacked >= size) { > - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, > - "quic send flow hit buffer size"); > - return 0; > + limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset, > + qs->send_max_data - qs->send_offset); > + > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, > + "quic stream id:0x%xL flush limit:%O", qs->id, limit); > + > + out = ngx_quic_read_chain(pc, &qs->out, limit); > + if (out == NGX_CHAIN_ERROR) { > + return NGX_ERROR; > } > > - size -= unacked; > + len = 0; > + last = 0; > + > + for (cl = out; cl; cl = cl->next) { > + len += cl->buf->last - cl->buf->pos; > + } > > - if (qc->streams.sent >= qc->streams.send_max_data) { > - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, > - "quic send flow hit MAX_DATA"); > - return 0; > + if (qs->send_final_size != (uint64_t) -1 > + && qs->send_final_size == qs->send_offset + len) > + { > + qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT; > + last = 1; > + } > + > + if (len == 0 && !last) { > + return NGX_OK; > } > > - if (qc->streams.sent + size > qc->streams.send_max_data) { > - size = qc->streams.send_max_data - qc->streams.sent; > + frame = ngx_quic_alloc_frame(pc); > + if (frame == NULL) { > + return NGX_ERROR; > } > > - if (sent >= qs->send_max_data) { > - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, > - "quic send flow hit MAX_STREAM_DATA"); > - return 0; > + frame->level = ssl_encryption_application; > + frame->type = NGX_QUIC_FT_STREAM; > + frame->data = out; > + > + frame->u.stream.off = 1; > + frame->u.stream.len = 1; > + frame->u.stream.fin = last; > + > + frame->u.stream.stream_id = qs->id; > + frame->u.stream.offset = qs->send_offset; > + frame->u.stream.length = len; > + > + ngx_quic_queue_frame(qc, frame); > + > + qs->send_offset += len; > + qc->streams.send_offset += len; > + > + ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0, > + "quic stream id:0x%xL flush len:%uz last:%ui", > + qs->id, len, last); > + > + if (qs->connection == NULL) { > + return ngx_quic_close_stream(qs); > } > > - if (sent + size > qs->send_max_data) { > - size = qs->send_max_data - sent; > - } > - > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > - "quic send flow:%uz", size); > - > - return size; > + return NGX_OK; > } > > > @@ -953,40 +981,67 @@ ngx_quic_stream_cleanup_handler(void *da > { > ngx_connection_t *c = data; > > + ngx_quic_stream_t *qs; > + > + qs = c->quic; > + > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0, > + "quic stream id:0x%xL cleanup", qs->id); > + > + if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) { > + ngx_quic_close_connection(c, NGX_ERROR); > + return; > + } > + > + qs->connection = NULL; > + > + if (ngx_quic_close_stream(qs) != NGX_OK) { > + ngx_quic_close_connection(c, NGX_ERROR); > + return; > + } > +} > + > + > +static ngx_int_t > +ngx_quic_close_stream(ngx_quic_stream_t *qs) > +{ > ngx_connection_t *pc; > ngx_quic_frame_t *frame; > - ngx_quic_stream_t *qs; > ngx_quic_connection_t *qc; > > - qs = c->quic; > pc = qs->parent; > qc = ngx_quic_get_connection(pc); > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > - "quic stream id:0x%xL cleanup", qs->id); > + if (!qc->closing) { > + /* make sure everything is sent and final size is received */ > + > + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV > + || qs->send_state == NGX_QUIC_STREAM_SEND_READY > + || qs->send_state == NGX_QUIC_STREAM_SEND_SEND) > + { > + return NGX_OK; > + } > + } > + > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, > + "quic stream id:0x%xL close", qs->id); > + > + ngx_quic_free_chain(pc, qs->in); > + ngx_quic_free_chain(pc, qs->out); > > ngx_rbtree_delete(&qc->streams.tree, &qs->node); > - ngx_quic_free_chain(pc, qs->in); > - ngx_quic_free_chain(pc, qs->out); > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); > > if (qc->closing) { > /* schedule handler call to continue ngx_quic_close_connection() */ > ngx_post_event(pc->read, &ngx_posted_events); > - return; > + return NGX_OK; > } > > - if (qc->error) { > - goto done; > - } > - > - (void) ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN); > - > - (void) ngx_quic_update_flow(c, qs->recv_last); > - > if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) { > frame = ngx_quic_alloc_frame(pc); > if (frame == NULL) { > - goto done; > + return NGX_ERROR; > } > > frame->level = ssl_encryption_application; > @@ -1004,13 +1059,11 @@ ngx_quic_stream_cleanup_handler(void *da > ngx_quic_queue_frame(qc, frame); > } > > -done: > - > - (void) ngx_quic_output(pc); > - > if (qc->shutdown) { > ngx_post_event(pc->read, &ngx_posted_events); > } > + > + return NGX_OK; > } > > > @@ -1020,7 +1073,6 @@ ngx_quic_handle_stream_frame(ngx_connect > { > size_t size; > uint64_t last; > - ngx_connection_t *sc; > ngx_quic_stream_t *qs; > ngx_quic_connection_t *qc; > ngx_quic_stream_frame_t *f; > @@ -1048,19 +1100,17 @@ ngx_quic_handle_stream_frame(ngx_connect > return NGX_OK; > } > > - sc = qs->connection; > - > if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV > && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN) > { > return NGX_OK; > } > > - if (ngx_quic_control_flow(sc, last) != NGX_OK) { > + if (ngx_quic_control_flow(qs, last) != NGX_OK) { > return NGX_ERROR; > } > > - if (qs->final_size != (uint64_t) -1 && last > qs->final_size) { > + if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) { > qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; > return NGX_ERROR; > } > @@ -1075,7 +1125,8 @@ ngx_quic_handle_stream_frame(ngx_connect > } > > if (f->fin) { > - if (qs->final_size != (uint64_t) -1 && qs->final_size != last) { > + if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != > last) > + { > qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; > return NGX_ERROR; > } > @@ -1085,7 +1136,7 @@ ngx_quic_handle_stream_frame(ngx_connect > return NGX_ERROR; > } > > - qs->final_size = last; > + qs->recv_final_size = last; > qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN; > } > > @@ -1099,13 +1150,17 @@ ngx_quic_handle_stream_frame(ngx_connect > qs->recv_size += size; > > if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN > - && qs->recv_size == qs->final_size) > + && qs->recv_size == qs->recv_final_size) > { > qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD; > } > > + if (qs->connection == NULL) { > + return ngx_quic_close_stream(qs); > + } > + > if (f->offset == qs->recv_offset) { > - ngx_quic_set_event(sc->read); > + ngx_quic_set_event(qs->connection->read); > } > > return NGX_OK; > @@ -1128,20 +1183,26 @@ ngx_quic_handle_max_data_frame(ngx_conne > return NGX_OK; > } > > - if (tree->root != tree->sentinel > - && qc->streams.sent >= qc->streams.send_max_data) > + if (tree->root == tree->sentinel > + || qc->streams.send_offset < qc->streams.send_max_data) > { > - > - for (node = ngx_rbtree_min(tree->root, tree->sentinel); > - node; > - node = ngx_rbtree_next(tree, node)) > - { > - qs = (ngx_quic_stream_t *) node; > - ngx_quic_set_event(qs->connection->write); > - } > + /* not blocked on MAX_DATA */ > + qc->streams.send_max_data = f->max_data; > + return NGX_OK; > } > > qc->streams.send_max_data = f->max_data; > + node = ngx_rbtree_min(tree->root, tree->sentinel); > + > + while (node && qc->streams.send_offset < qc->streams.send_max_data) { > + > + qs = (ngx_quic_stream_t *) node; > + node = ngx_rbtree_next(tree, node); > + > + if (ngx_quic_stream_flush(qs) != NGX_OK) { > + return NGX_ERROR; > + } > + } > > return NGX_OK; > } > @@ -1189,7 +1250,7 @@ ngx_quic_handle_stream_data_blocked_fram > return NGX_OK; > } > > - return ngx_quic_update_max_stream_data(qs->connection); > + return ngx_quic_update_max_stream_data(qs); > } > > > @@ -1197,7 +1258,6 @@ ngx_int_t > ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c, > ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f) > { > - uint64_t sent; > ngx_quic_stream_t *qs; > ngx_quic_connection_t *qc; > > @@ -1224,15 +1284,15 @@ ngx_quic_handle_max_stream_data_frame(ng > return NGX_OK; > } > > - sent = qs->connection->sent; > - > - if (sent >= qs->send_max_data) { > - ngx_quic_set_event(qs->connection->write); > + if (qs->send_offset < qs->send_max_data) { > + /* not blocked on MAX_STREAM_DATA */ > + qs->send_max_data = f->limit; > + return NGX_OK; > } > > qs->send_max_data = f->limit; > > - return NGX_OK; > + return ngx_quic_stream_flush(qs); > } > > > @@ -1240,7 +1300,6 @@ ngx_int_t > ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, > ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f) > { > - ngx_connection_t *sc; > ngx_quic_stream_t *qs; > ngx_quic_connection_t *qc; > > @@ -1271,13 +1330,13 @@ ngx_quic_handle_reset_stream_frame(ngx_c > > qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; > > - sc = qs->connection; > - > - if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) { > + if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) { > return NGX_ERROR; > } > > - if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) { > + if (qs->recv_final_size != (uint64_t) -1 > + && qs->recv_final_size != f->final_size) > + { > qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; > return NGX_ERROR; > } > @@ -1287,12 +1346,16 @@ ngx_quic_handle_reset_stream_frame(ngx_c > return NGX_ERROR; > } > > - qs->final_size = f->final_size; > + qs->recv_final_size = f->final_size; > > - if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) { > + if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) { > return NGX_ERROR; > } > > + if (qs->connection == NULL) { > + return ngx_quic_close_stream(qs); > + } > + > ngx_quic_set_event(qs->connection->read); > > return NGX_OK; > @@ -1325,10 +1388,14 @@ ngx_quic_handle_stop_sending_frame(ngx_c > return NGX_OK; > } > > - if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) { > + if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) { > return NGX_ERROR; > } > > + if (qs->connection == NULL) { > + return ngx_quic_close_stream(qs); > + } > + > ngx_quic_set_event(qs->connection->write); > > return NGX_OK; > @@ -1378,30 +1445,37 @@ ngx_quic_handle_stream_ack(ngx_connectio > return; > } > > + if (qs->connection == NULL) { > + qs->acked += f->u.stream.length; > + return; > + } > + > sent = qs->connection->sent; > unacked = sent - qs->acked; > + qs->acked += f->u.stream.length; > > - if (unacked >= qc->conf->stream_buffer_size) { > - ngx_quic_set_event(qs->connection->write); > + ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, > + "quic stream id:0x%xL ack len:%uL acked:%uL unacked:%uL", > + qs->id, f->u.stream.length, qs->acked, sent - qs->acked); > + > + if (unacked != qc->conf->stream_buffer_size) { > + /* not blocked on buffer size */ > + return; > } > > - qs->acked += f->u.stream.length; > - > - ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0, > - "quic stream ack len:%uL acked:%uL unacked:%uL", > - f->u.stream.length, qs->acked, sent - qs->acked); > + ngx_quic_set_event(qs->connection->write); > } > > > static ngx_int_t > -ngx_quic_control_flow(ngx_connection_t *c, uint64_t last) > +ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last) > { > uint64_t len; > - ngx_quic_stream_t *qs; > + ngx_connection_t *pc; > ngx_quic_connection_t *qc; > > - qs = c->quic; > - qc = ngx_quic_get_connection(qs->parent); > + pc = qs->parent; > + qc = ngx_quic_get_connection(pc); > > if (last <= qs->recv_last) { > return NGX_OK; > @@ -1409,9 +1483,9 @@ ngx_quic_control_flow(ngx_connection_t * > > len = last - qs->recv_last; > > - ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, > - "quic flow control msd:%uL/%uL md:%uL/%uL", > - last, qs->recv_max_data, qc->streams.recv_last + len, > + ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0, > + "quic stream id:0x%xL flow control msd:%uL/%uL > md:%uL/%uL", > + qs->id, last, qs->recv_max_data, qc->streams.recv_last + > len, > qc->streams.recv_max_data); > > qs->recv_last += len; > @@ -1435,14 +1509,12 @@ ngx_quic_control_flow(ngx_connection_t * > > > static ngx_int_t > -ngx_quic_update_flow(ngx_connection_t *c, uint64_t last) > +ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last) > { > uint64_t len; > ngx_connection_t *pc; > - ngx_quic_stream_t *qs; > ngx_quic_connection_t *qc; > > - qs = c->quic; > pc = qs->parent; > qc = ngx_quic_get_connection(pc); > > @@ -1452,13 +1524,13 @@ ngx_quic_update_flow(ngx_connection_t *c > > len = last - qs->recv_offset; > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > - "quic flow update %uL", last); > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, > + "quic stream id:0x%xL flow update %uL", qs->id, last); > > qs->recv_offset += len; > > if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) { > - if (ngx_quic_update_max_stream_data(c) != NGX_OK) { > + if (ngx_quic_update_max_stream_data(qs) != NGX_OK) { > return NGX_ERROR; > } > } > @@ -1478,15 +1550,13 @@ ngx_quic_update_flow(ngx_connection_t *c > > > static ngx_int_t > -ngx_quic_update_max_stream_data(ngx_connection_t *c) > +ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs) > { > uint64_t recv_max_data; > ngx_connection_t *pc; > ngx_quic_frame_t *frame; > - ngx_quic_stream_t *qs; > ngx_quic_connection_t *qc; > > - qs = c->quic; > pc = qs->parent; > qc = ngx_quic_get_connection(pc); > > @@ -1502,8 +1572,9 @@ ngx_quic_update_max_stream_data(ngx_conn > > qs->recv_max_data = recv_max_data; > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, > - "quic flow update msd:%uL", qs->recv_max_data); > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, > + "quic stream id:0x%xL flow update msd:%uL", > + qs->id, qs->recv_max_data); > > frame = ngx_quic_alloc_frame(pc); > if (frame == NULL) { > diff --git a/src/http/v3/ngx_http_v3_uni.c b/src/http/v3/ngx_http_v3_uni.c > --- a/src/http/v3/ngx_http_v3_uni.c > +++ b/src/http/v3/ngx_http_v3_uni.c > @@ -295,8 +295,6 @@ ngx_http_v3_uni_dummy_write_handler(ngx_ > } > > > -/* XXX async & buffered stream writes */ > - > ngx_connection_t * > ngx_http_v3_create_push_stream(ngx_connection_t *c, uint64_t push_id) > { > _______________________________________________ > nginx-devel mailing list -- nginx-devel@nginx.org > To unsubscribe send an email to nginx-devel-le...@nginx.org _______________________________________________ nginx-devel mailing list -- nginx-devel@nginx.org To unsubscribe send an email to nginx-devel-le...@nginx.org