This is an automated email from the ASF dual-hosted git repository. masaori pushed a commit to branch quic-latest in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/quic-latest by this push: new 535134b Refactoring QUICStream & HQClientTransaction 535134b is described below commit 535134be1c2b83a4bb3fff0b2d6cfbb0d2288ec2 Author: Masaori Koshiba <masa...@apache.org> AuthorDate: Fri Dec 8 16:51:35 2017 +0900 Refactoring QUICStream & HQClientTransaction --- iocore/net/quic/QUICApplication.cc | 54 +++++-- iocore/net/quic/QUICApplication.h | 4 + iocore/net/quic/QUICFrame.h | 2 +- iocore/net/quic/QUICStream.cc | 253 +++++++++++++++++++---------- iocore/net/quic/QUICStream.h | 34 ++-- iocore/net/quic/QUICStreamManager.cc | 15 +- iocore/net/quic/test/test_QUICStream.cc | 18 +-- proxy/hq/HQClientSession.cc | 2 +- proxy/hq/HQClientTransaction.cc | 272 ++++++++++++++++++++++++-------- proxy/hq/HQClientTransaction.h | 37 +++-- proxy/hq/QUICSimpleApp.cc | 15 +- 11 files changed, 483 insertions(+), 223 deletions(-) diff --git a/iocore/net/quic/QUICApplication.cc b/iocore/net/quic/QUICApplication.cc index eca747f..5b8f3f4 100644 --- a/iocore/net/quic/QUICApplication.cc +++ b/iocore/net/quic/QUICApplication.cc @@ -31,14 +31,16 @@ static constexpr char tag[] = "quic_app"; // QUICStreamIO::QUICStreamIO(QUICApplication *app, QUICStream *stream) : _stream(stream) { - this->_read_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_4K); - this->_write_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_4K); + this->_read_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_8K); + this->_write_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_8K); this->_read_buffer_reader = _read_buffer->alloc_reader(); this->_write_buffer_reader = _write_buffer->alloc_reader(); - this->_read_vio = stream->do_io_read(app, 0, _read_buffer); - this->_write_vio = stream->do_io_write(app, 0, _write_buffer_reader); + this->_read_vio = stream->do_io_read(app, INT64_MAX, this->_read_buffer); + this->_read_vio->buffer.reader_for(this->_read_buffer_reader); + + this->_write_vio = stream->do_io_write(app, INT64_MAX, this->_write_buffer_reader); } int64_t @@ -71,14 +73,16 @@ QUICStreamIO::write(IOBufferReader *r, int64_t alen, int64_t offset) { SCOPED_MUTEX_LOCK(lock, this->_write_vio->mutex, this_ethread()); - if (this->_write_buffer->write_avail() > 0) { - int64_t bytes_add = this->_write_buffer->write(r, alen, offset); + int64_t bytes_avail = this->_write_buffer->write_avail(); + Debug(tag, "nbytes=%" PRId64 " ndone=%" PRId64 " write_avail=%" PRId64 " write_len=%" PRId64, this->_write_vio->nbytes, + this->_write_vio->ndone, bytes_avail, alen); - return bytes_add; + if (bytes_avail > 0) { + int64_t len = std::min(bytes_avail, alen); + int64_t bytes_added = this->_write_buffer->write(r, len, offset); + return bytes_added; } else { - Debug(tag, "write buffer is full"); - return 0; } } @@ -86,7 +90,7 @@ QUICStreamIO::write(IOBufferReader *r, int64_t alen, int64_t offset) void QUICStreamIO::set_write_vio_nbytes(int64_t nbytes) { - this->_write_vio->nbytes += nbytes; + this->_write_vio->nbytes = nbytes; } void @@ -119,6 +123,12 @@ QUICStreamIO::get_transaction_id() const return this->_stream->id(); } +bool +QUICStreamIO::is_vio(VIO *vio) +{ + return (this->_read_vio == vio || this->_write_vio == vio); +} + // // QUICApplication // @@ -178,3 +188,27 @@ QUICApplication::_find_stream_io(QUICStreamId id) return result->second; } } + +QUICStreamIO * +QUICApplication::_find_stream_io(VIO *vio) +{ + for (auto i : this->_stream_map) { + if (i.second->is_vio(vio)) { + return i.second; + } + } + + return nullptr; +} + +QUICStreamId +QUICApplication::_find_stream_id(VIO *vio) +{ + for (auto i : this->_stream_map) { + if (i.second->is_vio(vio)) { + return i.first; + } + } + + return 0; +} diff --git a/iocore/net/quic/QUICApplication.h b/iocore/net/quic/QUICApplication.h index 17e32e9..f8a37aa 100644 --- a/iocore/net/quic/QUICApplication.h +++ b/iocore/net/quic/QUICApplication.h @@ -49,6 +49,7 @@ public: IOBufferReader *get_read_buffer_reader(); void shutdown(); uint32_t get_transaction_id() const; + bool is_vio(VIO *); private: QUICStream *_stream = nullptr; @@ -80,6 +81,9 @@ public: protected: QUICStreamIO *_find_stream_io(QUICStreamId id); + // TODO: return pair + QUICStreamIO *_find_stream_io(VIO *vio); + QUICStreamId _find_stream_id(VIO *vio); QUICConnection *_client_qc = nullptr; diff --git a/iocore/net/quic/QUICFrame.h b/iocore/net/quic/QUICFrame.h index e92e094..9f51b3f 100644 --- a/iocore/net/quic/QUICFrame.h +++ b/iocore/net/quic/QUICFrame.h @@ -36,7 +36,7 @@ class QUICFrame public: QUICFrame(const uint8_t *buf, size_t len) : _buf(buf), _len(len){}; virtual QUICFrameType type() const; - virtual size_t size() const = 0; + virtual size_t size() const = 0; virtual void store(uint8_t *buf, size_t *len) const = 0; virtual void reset(const uint8_t *buf, size_t len); static QUICFrameType type(const uint8_t *buf); diff --git a/iocore/net/quic/QUICStream.cc b/iocore/net/quic/QUICStream.cc index 0d9214a..1ca315e 100644 --- a/iocore/net/quic/QUICStream.cc +++ b/iocore/net/quic/QUICStream.cc @@ -35,10 +35,25 @@ Debug("quic_flow_ctrl", "[%" PRIx64 "] [%" PRIx32 "] [%s] " fmt, static_cast<uint64_t>(this->_connection_id), this->_id, \ QUICDebugNames::stream_state(this->_state), ##__VA_ARGS__) +QUICStream::~QUICStream() +{ + if (this->_read_event) { + this->_read_event->cancel(); + this->_read_event = nullptr; + } + + if (this->_write_event) { + this->_write_event->cancel(); + this->_write_event = nullptr; + } +} + void QUICStream::init(QUICFrameTransmitter *tx, QUICConnectionId cid, QUICStreamId sid, uint64_t recv_max_stream_data, uint64_t send_max_stream_data) { + SET_HANDLER(&QUICStream::state_stream_open); + this->mutex = new_ProxyMutex(); this->_tx = tx; this->_connection_id = cid; @@ -51,12 +66,6 @@ QUICStream::init(QUICFrameTransmitter *tx, QUICConnectionId cid, QUICStreamId si } void -QUICStream::start() -{ - SET_HANDLER(&QUICStream::main_event_handler); -} - -void QUICStream::init_flow_control_params(uint32_t recv_max_stream_data, uint32_t send_max_stream_data) { this->_flow_control_buffer_size = recv_max_stream_data; @@ -82,7 +91,7 @@ QUICStream::final_offset() } int -QUICStream::main_event_handler(int event, void *data) +QUICStream::state_stream_open(int event, void *data) { QUICStreamDebug("%s", QUICDebugNames::vc_event(event)); QUICErrorUPtr error = std::unique_ptr<QUICError>(new QUICNoError()); @@ -90,20 +99,19 @@ QUICStream::main_event_handler(int event, void *data) switch (event) { case VC_EVENT_READ_READY: case VC_EVENT_READ_COMPLETE: { - this->_signal_read_event(true); - this->_read_event = nullptr; + int64_t len = this->_process_read_vio(); + if (len > 0) { + this->_signal_read_event(); + } break; } case VC_EVENT_WRITE_READY: case VC_EVENT_WRITE_COMPLETE: { - error = this->_send(); - this->_signal_write_event(true); - this->_write_event = nullptr; - - QUICStreamDebug("wvio.nbytes=%" PRId64 " wvio.ndone=%" PRId64 " wvio.read_avail=%" PRId64 " wvio.write_avail=%" PRId64, - this->_write_vio.nbytes, this->_write_vio.ndone, this->_write_vio.get_reader()->read_avail(), - this->_write_vio.get_writer()->write_avail()); + int64_t len = this->_process_write_vio(); + if (len > 0) { + this->_signal_write_event(); + } break; } @@ -111,6 +119,7 @@ QUICStream::main_event_handler(int event, void *data) case VC_EVENT_ERROR: case VC_EVENT_INACTIVITY_TIMEOUT: case VC_EVENT_ACTIVE_TIMEOUT: { + // TODO ink_assert(false); break; } @@ -138,7 +147,38 @@ QUICStream::main_event_handler(int event, void *data) } } - return EVENT_CONT; + return EVENT_DONE; +} + +int +QUICStream::state_stream_closed(int event, void *data) +{ + QUICStreamDebug("%s", QUICDebugNames::vc_event(event)); + + switch (event) { + case VC_EVENT_READ_READY: + case VC_EVENT_READ_COMPLETE: { + // ignore + break; + } + case VC_EVENT_WRITE_READY: + case VC_EVENT_WRITE_COMPLETE: { + // ignore + break; + } + case VC_EVENT_EOS: + case VC_EVENT_ERROR: + case VC_EVENT_INACTIVITY_TIMEOUT: + case VC_EVENT_ACTIVE_TIMEOUT: { + // TODO + ink_assert(false); + break; + } + default: + ink_assert(false); + } + + return EVENT_DONE; } VIO * @@ -157,8 +197,8 @@ QUICStream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) this->_read_vio.vc_server = this; this->_read_vio.op = VIO::READ; - // TODO: If read function is added, call reenable here - this->_read_vio.reenable(); + this->_process_read_vio(); + this->_send_tracked_event(this->_read_event, VC_EVENT_READ_READY, &this->_read_vio); return &this->_read_vio; } @@ -179,7 +219,8 @@ QUICStream::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bo this->_write_vio.vc_server = this; this->_write_vio.op = VIO::WRITE; - this->_write_vio.reenable(); + this->_process_write_vio(); + this->_send_tracked_event(this->_write_event, VC_EVENT_WRITE_READY, &this->_write_vio); return &this->_write_vio; } @@ -187,6 +228,8 @@ QUICStream::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bo void QUICStream::do_io_close(int lerrno) { + SET_HANDLER(&QUICStream::state_stream_closed); + this->_read_vio.buffer.clear(); this->_read_vio.nbytes = 0; this->_read_vio.op = VIO::NONE; @@ -202,83 +245,39 @@ void QUICStream::do_io_shutdown(ShutdownHowTo_t howto) { ink_assert(false); // unimplemented yet + return; } void QUICStream::reenable(VIO *vio) { if (vio->op == VIO::READ) { - SCOPED_MUTEX_LOCK(lock, this->_read_vio.mutex, this_ethread()); + QUICStreamDebug("read_vio reenabled"); - if (this->_read_vio.nbytes > 0) { - int event = (this->_read_vio.ntodo() == 0) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY; - - if (this->_read_event == nullptr) { - this->_read_event = this_ethread()->schedule_imm_local(this, event); - } + int64_t len = this->_process_read_vio(); + if (len > 0) { + this->_signal_read_event(); } } else if (vio->op == VIO::WRITE) { - SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread()); + QUICStreamDebug("write_vio reenabled"); - if (this->_write_vio.nbytes > 0) { - int event = (this->_write_vio.ntodo() == 0) ? VC_EVENT_WRITE_COMPLETE : VC_EVENT_WRITE_READY; - - if (this->_write_event == nullptr) { - this->_write_event = this_ethread()->schedule_imm_local(this, event); - } + int64_t len = this->_process_write_vio(); + if (len > 0) { + this->_signal_write_event(); } } } -/** - * @brief Signal event to this->_read_vio._cont - * @param (call_update) If true, safe to call vio handler directly. - * Or called from do_io_read. Still setting things up. Send event to handle this after the dust settles - */ void -QUICStream::_signal_read_event(bool direct) +QUICStream::set_read_vio_nbytes(int64_t nbytes) { - int event = (this->_read_vio.ntodo() == 0) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY; - Continuation *cont = this->_read_vio._cont; - - if (direct) { - Event *e = eventAllocator.alloc(); - e->callback_event = event; - e->cookie = this; - e->init(cont, 0, 0); - - cont->handleEvent(event, e); - } else { - this_ethread()->schedule_imm(cont, event, this); - } + this->_read_vio.nbytes = nbytes; } -/** - * @brief Signal event to this->_write_vio._cont - * @param (call_update) If true, safe to call vio handler directly. - * Or called from do_io_write. Still setting things up. Send event to handle this after the dust settles - */ void -QUICStream::_signal_write_event(bool direct) +QUICStream::set_write_vio_nbytes(int64_t nbytes) { - if (this->_write_vio.get_writer()->write_avail() == 0) { - QUICStreamDebug("wvio.write_avail=0"); - return; - } - - int event = (this->_write_vio.ntodo() == 0) ? VC_EVENT_WRITE_COMPLETE : VC_EVENT_WRITE_READY; - Continuation *cont = this->_write_vio._cont; - - if (direct) { - Event *e = eventAllocator.alloc(); - e->callback_event = event; - e->cookie = this; - e->init(cont, 0, 0); - - cont->handleEvent(event, e); - } else { - this_ethread()->schedule_imm(cont, event, this); - } + this->_write_vio.nbytes = nbytes; } void @@ -333,6 +332,8 @@ QUICStream::recv(const std::shared_ptr<const QUICStreamFrame> frame) new_frame = this->_received_stream_frame_buffer.pop(); } + this->_signal_read_event(); + return QUICErrorUPtr(new QUICNoError()); } @@ -343,12 +344,10 @@ QUICStream::recv(const std::shared_ptr<const QUICMaxStreamDataFrame> frame) QUICStreamFCDebug("[REMOTE] %" PRIu64 "/%" PRIu64, this->_remote_flow_controller->current_offset(), this->_remote_flow_controller->current_limit()); - if (this->_write_vio.op == VIO::WRITE) { - // restart sending - QUICStreamDebug("restart sending"); - - this->_send(); - this->_signal_write_event(false); + QUICStreamDebug("restart sending"); + int64_t len = this->_process_write_vio(); + if (len > 0) { + this->_signal_write_event(); } return QUICErrorUPtr(new QUICNoError()); @@ -362,12 +361,94 @@ QUICStream::recv(const std::shared_ptr<const QUICStreamBlockedFrame> frame) } /** + * Replace existing event only if the new event is different than the inprogress event + */ +Event * +QUICStream::_send_tracked_event(Event *event, int send_event, VIO *vio) +{ + if (event != nullptr) { + if (event->callback_event != send_event) { + event->cancel(); + event = nullptr; + } + } + + if (event == nullptr) { + event = this_ethread()->schedule_imm(this, send_event, vio); + } + + return event; +} + +/** + * @brief Signal event to this->_read_vio._cont + */ +void +QUICStream::_signal_read_event() +{ + if (this->_read_vio._cont == nullptr || this->_read_vio.op == VIO::NONE) { + return; + } + MUTEX_TRY_LOCK(lock, this->_read_vio.mutex, this_ethread()); + + int event = this->_read_vio.ntodo() ? VC_EVENT_READ_READY : VC_EVENT_READ_COMPLETE; + + if (lock.is_locked()) { + this->_read_vio._cont->handleEvent(event, &this->_read_vio); + } else { + this_ethread()->schedule_imm(this->_read_vio._cont, event, &this->_read_vio); + } + + QUICStreamDebug("%s", QUICDebugNames::vc_event(event)); +} + +/** + * @brief Signal event to this->_write_vio._cont + */ +void +QUICStream::_signal_write_event() +{ + if (this->_write_vio._cont == nullptr || this->_write_vio.op == VIO::NONE) { + return; + } + MUTEX_TRY_LOCK(lock, this->_write_vio.mutex, this_ethread()); + + int event = this->_write_vio.ntodo() ? VC_EVENT_WRITE_READY : VC_EVENT_WRITE_COMPLETE; + + if (lock.is_locked()) { + this->_write_vio._cont->handleEvent(event, &this->_write_vio); + } else { + this_ethread()->schedule_imm(this->_write_vio._cont, event, &this->_write_vio); + } + + QUICStreamDebug("%s", QUICDebugNames::vc_event(event)); +} + +int64_t +QUICStream::_process_read_vio() +{ + if (this->_read_vio._cont == nullptr || this->_read_vio.op == VIO::NONE) { + return 0; + } + + // Pass through. Read operation is done by QUICStream::recv(const std::shared_ptr<const QUICStreamFrame> frame) + // TODO: 1. pop frame from _received_stream_frame_buffer + // 2. write data to _read_vio + + return 0; +} + +/** * @brief Send STREAM DATA from _response_buffer * @detail Call _signal_write_event() to indicate event upper layer */ -QUICErrorUPtr -QUICStream::_send() +int64_t +QUICStream::_process_write_vio() { + if (this->_write_vio._cont == nullptr || this->_write_vio.op == VIO::NONE) { + return 0; + } + SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread()); QUICErrorUPtr error = std::unique_ptr<QUICError>(new QUICNoError()); @@ -420,7 +501,7 @@ QUICStream::_send() this->_tx->transmit_frame(std::move(frame)); } - return error; + return total_len; } void diff --git a/iocore/net/quic/QUICStream.h b/iocore/net/quic/QUICStream.h index 282b434..387e83c 100644 --- a/iocore/net/quic/QUICStream.h +++ b/iocore/net/quic/QUICStream.h @@ -46,22 +46,26 @@ class QUICStream : public VConnection { public: QUICStream() : VConnection(nullptr), _received_stream_frame_buffer(this) {} - ~QUICStream() {} + ~QUICStream(); void init(QUICFrameTransmitter *tx, QUICConnectionId cid, QUICStreamId id, uint64_t recv_max_stream_data = 0, uint64_t send_max_stream_data = 0); - void start(); + // void start(); + int state_stream_open(int event, void *data); + int state_stream_closed(int event, void *data); + void init_flow_control_params(uint32_t recv_max_stream_data, uint32_t send_max_stream_data); - int main_event_handler(int event, void *data); QUICStreamId id() const; QUICOffset final_offset(); - // Implement VConnection interface. - VIO *do_io_read(Continuation *c, int64_t nbytes = INT64_MAX, MIOBuffer *buf = nullptr) override; + // Implement VConnection Interface. + VIO *do_io_read(Continuation *c, int64_t nbytes = INT64_MAX, MIOBuffer *buf = 0) override; VIO *do_io_write(Continuation *c = nullptr, int64_t nbytes = INT64_MAX, IOBufferReader *buf = 0, bool owner = false) override; void do_io_close(int lerrno = -1) override; void do_io_shutdown(ShutdownHowTo_t howto) override; void reenable(VIO *vio) override; + void set_read_vio_nbytes(int64_t); + void set_write_vio_nbytes(int64_t); QUICErrorUPtr recv(const std::shared_ptr<const QUICStreamFrame> frame); QUICErrorUPtr recv(const std::shared_ptr<const QUICMaxStreamDataFrame> frame); @@ -78,20 +82,15 @@ public: LINK(QUICStream, link); private: - QUICStreamState _state; - - QUICErrorUPtr _send(); + int64_t _process_read_vio(); + int64_t _process_write_vio(); + void _signal_read_event(); + void _signal_write_event(); + Event *_send_tracked_event(Event *, int, VIO *); void _write_to_read_vio(const std::shared_ptr<const QUICStreamFrame> &); - // NOTE: Those are called update_read_request/update_write_request in Http2Stream - // void _read_from_net(uint64_t read_len, bool direct); - // void _write_to_net(IOBufferReader *buf_reader, int64_t write_len, bool direct); - - void _signal_read_event(bool call_update); - void _signal_write_event(bool call_update); - - Event *_send_tracked_event(Event *event, int send_event, VIO *vio); + QUICStreamState _state; bool _fin = false; QUICConnectionId _connection_id = 0; QUICStreamId _id = 0; @@ -111,6 +110,5 @@ private: // TODO: Consider to replace with ts/RbTree.h or other data structure QUICIncomingFrameBuffer _received_stream_frame_buffer; - QUICStreamManager *_stream_manager = nullptr; - QUICFrameTransmitter *_tx = nullptr; + QUICFrameTransmitter *_tx = nullptr; }; diff --git a/iocore/net/quic/QUICStreamManager.cc b/iocore/net/quic/QUICStreamManager.cc index 3b6ac32..5223bb4 100644 --- a/iocore/net/quic/QUICStreamManager.cc +++ b/iocore/net/quic/QUICStreamManager.cc @@ -40,7 +40,10 @@ std::vector<QUICFrameType> QUICStreamManager::interests() { return { - QUICFrameType::STREAM, QUICFrameType::RST_STREAM, QUICFrameType::MAX_STREAM_DATA, QUICFrameType::MAX_STREAM_ID, + QUICFrameType::STREAM, + QUICFrameType::RST_STREAM, + QUICFrameType::MAX_STREAM_DATA, + QUICFrameType::MAX_STREAM_ID, }; } @@ -147,13 +150,7 @@ QUICStreamManager::_handle_frame(const std::shared_ptr<const QUICStreamFrame> &f if (!application->is_stream_set(stream)) { application->set_stream(stream); } - - size_t nbytes_to_read = stream->nbytes_to_read(); - QUICErrorUPtr error = stream->recv(frame); - // Prevent trigger read events multiple times - if (nbytes_to_read == 0) { - this_ethread()->schedule_imm(application, VC_EVENT_READ_READY, stream); - } + QUICErrorUPtr error = stream->recv(frame); return error; } @@ -213,8 +210,6 @@ QUICStreamManager::_find_or_create_stream(QUICStreamId stream_id) this->_remote_tp->getAsUInt32(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA)); } - stream->start(); - this->stream_list.push(stream); } return stream; diff --git a/iocore/net/quic/test/test_QUICStream.cc b/iocore/net/quic/test/test_QUICStream.cc index bd2f648..00dd32a 100644 --- a/iocore/net/quic/test/test_QUICStream.cc +++ b/iocore/net/quic/test/test_QUICStream.cc @@ -205,31 +205,31 @@ TEST_CASE("QUICStream", "[quic]") const char data[1024] = {0}; write_buffer->write(data, 1024); - stream->main_event_handler(VC_EVENT_WRITE_READY, nullptr); + stream->handleEvent(VC_EVENT_WRITE_READY, nullptr); CHECK(tx.frameCount[static_cast<int>(QUICFrameType::STREAM)] == 1); write_buffer->write(data, 1024); - stream->main_event_handler(VC_EVENT_WRITE_READY, nullptr); + stream->handleEvent(VC_EVENT_WRITE_READY, nullptr); CHECK(tx.frameCount[static_cast<int>(QUICFrameType::STREAM)] == 2); write_buffer->write(data, 1024); - stream->main_event_handler(VC_EVENT_WRITE_READY, nullptr); + stream->handleEvent(VC_EVENT_WRITE_READY, nullptr); CHECK(tx.frameCount[static_cast<int>(QUICFrameType::STREAM)] == 3); write_buffer->write(data, 1024); - stream->main_event_handler(VC_EVENT_WRITE_READY, nullptr); + stream->handleEvent(VC_EVENT_WRITE_READY, nullptr); CHECK(tx.frameCount[static_cast<int>(QUICFrameType::STREAM)] == 4); // This should not send a frame because of flow control write_buffer->write(data, 1024); - stream->main_event_handler(VC_EVENT_WRITE_READY, nullptr); + stream->handleEvent(VC_EVENT_WRITE_READY, nullptr); CHECK(tx.frameCount[static_cast<int>(QUICFrameType::STREAM)] == 4); // Update window stream->recv(std::make_shared<QUICMaxStreamDataFrame>(stream_id, 5120)); // This should send a frame - stream->main_event_handler(VC_EVENT_WRITE_READY, nullptr); + stream->handleEvent(VC_EVENT_WRITE_READY, nullptr); CHECK(tx.frameCount[static_cast<int>(QUICFrameType::STREAM)] == 5); // Update window @@ -237,16 +237,16 @@ TEST_CASE("QUICStream", "[quic]") // This should send a frame write_buffer->write(data, 1024); - stream->main_event_handler(VC_EVENT_WRITE_READY, nullptr); + stream->handleEvent(VC_EVENT_WRITE_READY, nullptr); CHECK(tx.frameCount[static_cast<int>(QUICFrameType::STREAM)] == 6); - stream->main_event_handler(VC_EVENT_WRITE_READY, nullptr); + stream->handleEvent(VC_EVENT_WRITE_READY, nullptr); CHECK(tx.frameCount[static_cast<int>(QUICFrameType::STREAM)] == 6); // Update window stream->recv(std::make_shared<QUICMaxStreamDataFrame>(stream_id, 6144)); - stream->main_event_handler(VC_EVENT_WRITE_READY, nullptr); + stream->handleEvent(VC_EVENT_WRITE_READY, nullptr); CHECK(tx.frameCount[static_cast<int>(QUICFrameType::STREAM)] == 7); } } diff --git a/proxy/hq/HQClientSession.cc b/proxy/hq/HQClientSession.cc index af4d988..b8ede45 100644 --- a/proxy/hq/HQClientSession.cc +++ b/proxy/hq/HQClientSession.cc @@ -52,7 +52,7 @@ HQClientSession::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *bu void HQClientSession::do_io_close(int lerrno) { - ink_assert(false); + // TODO return; } diff --git a/proxy/hq/HQClientTransaction.cc b/proxy/hq/HQClientTransaction.cc index cc3a255..69939cc 100644 --- a/proxy/hq/HQClientTransaction.cc +++ b/proxy/hq/HQClientTransaction.cc @@ -32,15 +32,15 @@ #define HQTransDebug(fmt, ...) \ Debug("hq_trans", "[%" PRId64 "] [%" PRIx32 "] " fmt, this->parent->connection_id(), this->get_transaction_id(), ##__VA_ARGS__) -static void -dump_io_buffer(IOBufferReader *reader) -{ - IOBufferReader *debug_reader = reader->clone(); - uint8_t msg[1024] = {0}; - int64_t msg_len = 1024; - int64_t read_len = debug_reader->read(msg, msg_len); - Debug("hq_trans", "len=%" PRId64 "\n%s\n", read_len, msg); -} +// static void +// dump_io_buffer(IOBufferReader *reader) +// { +// IOBufferReader *debug_reader = reader->clone(); +// uint8_t msg[1024] = {0}; +// int64_t msg_len = 1024; +// int64_t read_len = debug_reader->read(msg, msg_len); +// Debug("v_hq_trans", "len=%" PRId64 "\n%s\n", read_len, msg); +// } HQClientTransaction::HQClientTransaction(HQClientSession *session, QUICStreamIO *stream_io) : super(), _stream_io(stream_io) @@ -50,7 +50,7 @@ HQClientTransaction::HQClientTransaction(HQClientSession *session, QUICStreamIO this->sm_reader = this->_read_vio_buf.alloc_reader(); static_cast<HQClientSession *>(this->parent)->add_transaction(this); - SET_HANDLER(&HQClientTransaction::main_event_handler); + SET_HANDLER(&HQClientTransaction::state_stream_open); } void @@ -91,68 +91,77 @@ HQClientTransaction::allow_half_open() const } int -HQClientTransaction::main_event_handler(int event, void *edata) +HQClientTransaction::state_stream_open(int event, void *edata) { - Debug("hq_trans", "%s", QUICDebugNames::vc_event(event)); + // TODO: should check recursive call? + HQTransDebug("%s", get_vc_event_name(event)); switch (event) { case VC_EVENT_READ_READY: case VC_EVENT_READ_COMPLETE: { - if (edata == this->_read_event) { - this->_read_event = nullptr; - } - if (this->_stream_io->read_avail()) { - this->_read_request(); + int64_t len = this->_process_read_vio(); + // if no progress, don't need to signal + if (len > 0) { + this->_signal_read_event(); } + this->_stream_io->read_reenable(); + break; } case VC_EVENT_WRITE_READY: case VC_EVENT_WRITE_COMPLETE: { - if (edata == this->_write_event) { - this->_write_event = nullptr; + int64_t len = this->_process_write_vio(); + if (len > 0) { + this->_signal_write_event(); } - if (this->_write_vio.get_reader()->read_avail()) { - this->_write_response(); - } - - HQTransDebug("wvio.nbytes=%" PRId64 " wvio.ndone=%" PRId64 " wvio.read_avail=%" PRId64 " wvio.write_avail=%" PRId64, - this->_write_vio.nbytes, this->_write_vio.ndone, this->_write_vio.get_reader()->read_avail(), - this->_write_vio.get_writer()->write_avail()); + this->_stream_io->write_reenable(); break; } + case VC_EVENT_EOS: + case VC_EVENT_ERROR: + case VC_EVENT_INACTIVITY_TIMEOUT: + case VC_EVENT_ACTIVE_TIMEOUT: { + ink_assert(false); + break; + } default: Debug("hq_trans", "Unknown event %d", event); ink_assert(false); } - return EVENT_CONT; + return EVENT_DONE; } -void -HQClientTransaction::reenable(VIO *vio) +int +HQClientTransaction::state_stream_closed(int event, void *data) { - if (vio->op == VIO::READ) { - SCOPED_MUTEX_LOCK(lock, this->_read_vio.mutex, this_ethread()); - - if (this->_read_vio.nbytes > 0) { - int event = (this->_read_vio.ntodo() == 0) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY; + HQTransDebug("%s", get_vc_event_name(event)); - if (this->_read_event == nullptr) { - this->_read_event = this_ethread()->schedule_imm_local(this, event); - } - } - } else if (vio->op == VIO::WRITE) { - SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread()); - - if (this->_write_vio.nbytes > 0) { - int event = (this->_write_vio.ntodo() == 0) ? VC_EVENT_WRITE_COMPLETE : VC_EVENT_WRITE_READY; - - if (this->_write_event == nullptr) { - this->_write_event = this_ethread()->schedule_imm_local(this, event); - } - } + switch (event) { + case VC_EVENT_READ_READY: + case VC_EVENT_READ_COMPLETE: { + // ignore + break; + } + case VC_EVENT_WRITE_READY: + case VC_EVENT_WRITE_COMPLETE: { + // ignore + break; } + case VC_EVENT_EOS: + case VC_EVENT_ERROR: + case VC_EVENT_INACTIVITY_TIMEOUT: + case VC_EVENT_ACTIVE_TIMEOUT: { + // TODO + ink_assert(false); + break; + } + default: + ink_assert(false); + } + + return EVENT_DONE; } VIO * @@ -171,7 +180,8 @@ HQClientTransaction::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) this->_read_vio.vc_server = this; this->_read_vio.op = VIO::READ; - this->_read_vio.reenable(); + this->_process_read_vio(); + this->_send_tracked_event(this->_read_event, VC_EVENT_READ_READY, &this->_read_vio); return &this->_read_vio; } @@ -192,7 +202,8 @@ HQClientTransaction::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader this->_write_vio.vc_server = this; this->_write_vio.op = VIO::WRITE; - this->_write_vio.reenable(); + this->_process_write_vio(); + this->_send_tracked_event(this->_write_event, VC_EVENT_WRITE_READY, &this->_write_vio); return &this->_write_vio; } @@ -200,27 +211,143 @@ HQClientTransaction::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader void HQClientTransaction::do_io_close(int lerrno) { + SET_HANDLER(&HQClientTransaction::state_stream_closed); + + if (this->_read_event) { + this->_read_event->cancel(); + this->_read_event = nullptr; + } + + if (this->_write_event) { + this->_write_event->cancel(); + this->_write_event = nullptr; + } + + this->_read_vio.buffer.clear(); + this->_read_vio.nbytes = 0; + this->_read_vio.op = VIO::NONE; + this->_read_vio._cont = nullptr; + + this->_write_vio.buffer.clear(); + this->_write_vio.nbytes = 0; + this->_write_vio.op = VIO::NONE; + this->_write_vio._cont = nullptr; + parent->do_io_close(lerrno); } void +HQClientTransaction::do_io_shutdown(ShutdownHowTo_t howto) +{ + return; +} + +void +HQClientTransaction::reenable(VIO *vio) +{ + if (vio->op == VIO::READ) { + int64_t len = this->_process_read_vio(); + if (len > 0) { + this->_signal_read_event(); + } + } else if (vio->op == VIO::WRITE) { + int64_t len = this->_process_write_vio(); + if (len > 0) { + this->_signal_write_event(); + } + } +} + +/** + * @brief Replace existing event only if the new event is different than the inprogress event + */ +Event * +HQClientTransaction::_send_tracked_event(Event *event, int send_event, VIO *vio) +{ + if (event != nullptr) { + if (event->callback_event != send_event) { + event->cancel(); + event = nullptr; + } + } + + if (event == nullptr) { + event = this_ethread()->schedule_imm(this, send_event, vio); + } + + return event; +} + +void +HQClientTransaction::set_read_vio_nbytes(int64_t nbytes) +{ + this->_read_vio.nbytes = nbytes; +} + +void +HQClientTransaction::set_write_vio_nbytes(int64_t nbytes) +{ + this->_write_vio.nbytes = nbytes; +} + +void HQClientTransaction::destroy() { current_reader = nullptr; } +/** + * @brief Signal event to this->_read_vio._cont + */ void -HQClientTransaction::do_io_shutdown(ShutdownHowTo_t howto) +HQClientTransaction::_signal_read_event() { - if (parent) { - parent->do_io_shutdown(howto); + if (this->_read_vio._cont == nullptr || this->_read_vio.op == VIO::NONE) { + return; + } + int event = this->_read_vio.ntodo() ? VC_EVENT_READ_READY : VC_EVENT_READ_COMPLETE; + + MUTEX_TRY_LOCK(lock, this->_read_vio.mutex, this_ethread()); + if (lock.is_locked()) { + this->_read_vio._cont->handleEvent(event, &this->_read_vio); + } else { + this_ethread()->schedule_imm(this->_read_vio._cont, event, &this->_read_vio); } + + HQTransDebug("%s", get_vc_event_name(event)); } -// Convert HTTP/0.9 to HTTP/1.1 +/** + * @brief Signal event to this->_write_vio._cont + */ void -HQClientTransaction::_read_request() +HQClientTransaction::_signal_write_event() { + if (this->_write_vio._cont == nullptr || this->_write_vio.op == VIO::NONE) { + return; + } + int event = this->_write_vio.ntodo() ? VC_EVENT_WRITE_READY : VC_EVENT_WRITE_COMPLETE; + + MUTEX_TRY_LOCK(lock, this->_write_vio.mutex, this_ethread()); + if (lock.is_locked()) { + this->_write_vio._cont->handleEvent(event, &this->_write_vio); + } else { + this_ethread()->schedule_imm(this->_write_vio._cont, event, &this->_write_vio); + } + + HQTransDebug("%s", get_vc_event_name(event)); +} + +// Convert HTTP/0.9 to HTTP/1.1 +int64_t +HQClientTransaction::_process_read_vio() +{ + if (this->_read_vio._cont == nullptr || this->_read_vio.op == VIO::NONE) { + return 0; + } + + SCOPED_MUTEX_LOCK(lock, this->_read_vio.mutex, this_ethread()); + IOBufferReader *client_vio_reader = this->_stream_io->get_read_buffer_reader(); int64_t bytes_avail = client_vio_reader->read_avail(); @@ -233,33 +360,40 @@ HQClientTransaction::_read_request() MIOBuffer *writer = this->_read_vio.get_writer(); writer->write(client_vio_reader, bytes_avail - n); + client_vio_reader->consume(bytes_avail); // FIXME: Get hostname from SNI? const char version[] = " HTTP/1.1\r\nHost: localhost\r\n\r\n"; writer->write(version, sizeof(version)); - dump_io_buffer(this->sm_reader); - - this->_read_vio._cont->handleEvent(VC_EVENT_READ_READY, &this->_read_vio); + return bytes_avail; } // FIXME: already defined somewhere? static constexpr char http_1_1_version[] = "HTTP/1.1"; // Convert HTTP/1.1 to HTTP/0.9 -void -HQClientTransaction::_write_response() +int64_t +HQClientTransaction::_process_write_vio() { + if (this->_write_vio._cont == nullptr || this->_write_vio.op == VIO::NONE) { + return 0; + } + SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread()); IOBufferReader *reader = this->_write_vio.get_reader(); - if (memcmp(reader->start(), http_1_1_version, sizeof(http_1_1_version) - 1) == 0) { + int64_t http_1_1_version_len = sizeof(http_1_1_version) - 1; + + if (reader->is_read_avail_more_than(http_1_1_version_len) && + memcmp(reader->start(), http_1_1_version, http_1_1_version_len) == 0) { // Skip HTTP/1.1 response headers IOBufferBlock *headers = reader->get_current_block(); int64_t headers_size = headers->read_avail(); reader->consume(headers_size); this->_write_vio.ndone += headers_size; + // The size of respons to client this->_stream_io->set_write_vio_nbytes(this->_write_vio.nbytes - headers_size); } @@ -267,11 +401,16 @@ HQClientTransaction::_write_response() // Write HTTP/1.1 response body int64_t bytes_avail = reader->read_avail(); int64_t total_written = 0; + + HQTransDebug("%" PRId64, bytes_avail); + while (total_written < bytes_avail) { - int64_t bytes_written = this->_stream_io->write(reader, bytes_avail); - if (bytes_written == 0) { + int64_t data_len = reader->block_read_avail(); + int64_t bytes_written = this->_stream_io->write(reader, data_len); + if (bytes_written <= 0) { break; } + reader->consume(bytes_written); this->_write_vio.ndone += bytes_written; total_written += bytes_written; @@ -283,12 +422,7 @@ HQClientTransaction::_write_response() this->_stream_io->shutdown(); } - this->_stream_io->write_reenable(); - - // Send back WRITE_READY event to HttpTunnel - if (this->_write_vio.ntodo() > 0 && this->_write_vio.get_writer()->write_avail()) { - this->_write_vio._cont->handleEvent(VC_EVENT_WRITE_READY, &this->_write_vio); - } + return total_written; } void diff --git a/proxy/hq/HQClientTransaction.h b/proxy/hq/HQClientTransaction.h index cad8b6c..550f6b3 100644 --- a/proxy/hq/HQClientTransaction.h +++ b/proxy/hq/HQClientTransaction.h @@ -23,6 +23,7 @@ #pragma once +#include "I_VConnection.h" #include "ProxyClientTransaction.h" class QUICStreamIO; @@ -35,11 +36,6 @@ public: HQClientTransaction(HQClientSession *session, QUICStreamIO *stream_io); - // Implement VConnection interface. - VIO *do_io_read(Continuation *c, int64_t nbytes = INT64_MAX, MIOBuffer *buf = 0) override; - VIO *do_io_write(Continuation *c = nullptr, int64_t nbytes = INT64_MAX, IOBufferReader *buf = 0, bool owner = false) override; - void do_io_close(int lerrno = -1) override; - // Implement ProxyClienTransaction interface void set_active_timeout(ink_hrtime timeout_in) override; void set_inactivity_timeout(ink_hrtime timeout_in) override; @@ -47,22 +43,35 @@ public: void transaction_done() override; bool allow_half_open() const override; void destroy() override; - void do_io_shutdown(ShutdownHowTo_t howto) override; - void reenable(VIO *vio) override; void release(IOBufferReader *r) override; int get_transaction_id() const override; + // VConnection interface + VIO *do_io_read(Continuation *c, int64_t nbytes = INT64_MAX, MIOBuffer *buf = 0) override; + VIO *do_io_write(Continuation *c = nullptr, int64_t nbytes = INT64_MAX, IOBufferReader *buf = 0, bool owner = false) override; + void do_io_close(int lerrno = -1) override; + void do_io_shutdown(ShutdownHowTo_t) override; + void reenable(VIO *) override; + + void set_read_vio_nbytes(int64_t nbytes); + void set_write_vio_nbytes(int64_t nbytes); + // HQClientTransaction specific methods - int main_event_handler(int event, void *edata); + int state_stream_open(int, void *); + int state_stream_closed(int event, void *data); private: - void _read_request(); - void _write_response(); + Event *_send_tracked_event(Event *, int, VIO *); + void _signal_read_event(); + void _signal_write_event(); + int64_t _process_read_vio(); + int64_t _process_write_vio(); + + MIOBuffer _read_vio_buf = CLIENT_CONNECTION_FIRST_READ_BUFFER_SIZE_INDEX; + QUICStreamIO *_stream_io = nullptr; - MIOBuffer _read_vio_buf = CLIENT_CONNECTION_FIRST_READ_BUFFER_SIZE_INDEX; VIO _read_vio; VIO _write_vio; - QUICStreamIO *_stream_io = nullptr; - Event *_read_event = nullptr; - Event *_write_event = nullptr; + Event *_read_event = nullptr; + Event *_write_event = nullptr; }; diff --git a/proxy/hq/QUICSimpleApp.cc b/proxy/hq/QUICSimpleApp.cc index a245642..6b9583b 100644 --- a/proxy/hq/QUICSimpleApp.cc +++ b/proxy/hq/QUICSimpleApp.cc @@ -56,18 +56,24 @@ QUICSimpleApp::main_event_handler(int event, Event *data) { Debug(tag, "%s", QUICDebugNames::vc_event(event)); - QUICStream *stream = reinterpret_cast<QUICStream *>(data->cookie); - QUICStreamIO *stream_io = this->_find_stream_io(stream->id()); + VIO *vio = reinterpret_cast<VIO *>(data); + + QUICStreamIO *stream_io = this->_find_stream_io(vio); + QUICStreamId stream_id = this->_find_stream_id(vio); + if (stream_io == nullptr) { - Debug(tag, "Unknown Stream, id: %d", stream->id()); + Debug(tag, "Unknown Stream"); return -1; } switch (event) { case VC_EVENT_READ_READY: case VC_EVENT_READ_COMPLETE: { + // TODO: lookup transaction if support POST request if (stream_io->read_avail()) { HQClientTransaction *trans = new HQClientTransaction(this->_client_session, stream_io); + SCOPED_MUTEX_LOCK(lock, trans->mutex, this_ethread()); + trans->new_transaction(); } else { Debug(tag, "No MSG"); @@ -76,8 +82,7 @@ QUICSimpleApp::main_event_handler(int event, Event *data) } case VC_EVENT_WRITE_READY: case VC_EVENT_WRITE_COMPLETE: { - HQClientTransaction *trans = this->_client_session->get_transaction(stream->id()); - + HQClientTransaction *trans = this->_client_session->get_transaction(stream_id); trans->handleEvent(event); break; -- To stop receiving notification emails like this one, please contact ['"commits@trafficserver.apache.org" <commits@trafficserver.apache.org>'].