This is an automated email from the ASF dual-hosted git repository. maskit pushed a commit to branch quic-05 in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit 46cf6d50173562002642278d0c6d9cb0886f3cbe Author: scw00 <sc...@apache.org> AuthorDate: Tue Oct 10 08:19:00 2017 +0800 add QUICInBuffer feature (cherry picked from commit 2f5caad25df450b16f9458c7119a421652f6ddb5) --- iocore/net/quic/Makefile.am | 3 +- iocore/net/quic/QUICIncomingFrameBuffer.cc | 135 ++++++++++++++++ iocore/net/quic/QUICIncomingFrameBuffer.h | 58 +++++++ iocore/net/quic/QUICStream.cc | 36 ++--- iocore/net/quic/QUICStream.h | 7 +- iocore/net/quic/test/Makefile.am | 35 +++- .../net/quic/test/test_QUICIncomingFrameBuffer.cc | 177 +++++++++++++++++++++ 7 files changed, 421 insertions(+), 30 deletions(-) diff --git a/iocore/net/quic/Makefile.am b/iocore/net/quic/Makefile.am index 90ba2fe..3fc2585 100644 --- a/iocore/net/quic/Makefile.am +++ b/iocore/net/quic/Makefile.am @@ -61,7 +61,8 @@ libquic_a_SOURCES = \ QUICConfig.cc \ QUICDebugNames.cc \ QUICApplication.cc \ - QUICApplicationMap.cc + QUICApplicationMap.cc \ + QUICIncomingFrameBuffer.cc include $(top_srcdir)/build/tidy.mk diff --git a/iocore/net/quic/QUICIncomingFrameBuffer.cc b/iocore/net/quic/QUICIncomingFrameBuffer.cc new file mode 100644 index 0000000..e769054 --- /dev/null +++ b/iocore/net/quic/QUICIncomingFrameBuffer.cc @@ -0,0 +1,135 @@ +/** @file + * + * A brief file description + * + * @section license License + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "QUICIncomingFrameBuffer.h" + +QUICIncomingFrameBuffer::~QUICIncomingFrameBuffer() +{ + this->_out_of_order_queue.clear(); + + while (!this->_recv_buffer.empty()) { + this->_recv_buffer.pop(); + } +} + +std::shared_ptr<const QUICStreamFrame> +QUICIncomingFrameBuffer::pop() +{ + if (this->_recv_buffer.empty()) { + auto frame = this->_out_of_order_queue.find(this->_recv_offset); + while (frame != this->_out_of_order_queue.end()) { + this->_recv_buffer.push(frame->second); + this->_recv_offset += frame->second->data_length(); + this->_out_of_order_queue.erase(frame); + frame = this->_out_of_order_queue.find(this->_recv_offset); + } + } + + if (!this->_recv_buffer.empty()) { + auto frame = this->_recv_buffer.front(); + this->_recv_buffer.pop(); + return frame; + } + return nullptr; +} + +QUICErrorUPtr +QUICIncomingFrameBuffer::insert(const std::shared_ptr<const QUICStreamFrame> frame) +{ + QUICOffset offset = frame->offset(); + size_t len = frame->data_length(); + + QUICErrorUPtr err = this->_check_and_set_fin_flag(offset, len, frame->has_fin_flag()); + if (err->cls != QUICErrorClass::NONE) { + return err; + } + + if (this->_recv_offset > offset) { + // dup frame; + return QUICErrorUPtr(new QUICNoError()); + } else if (this->_recv_offset == offset) { + this->_recv_offset = offset + len; + this->_recv_buffer.push(frame); + } else { + this->_out_of_order_queue.insert(std::make_pair(offset, frame)); + } + + return QUICErrorUPtr(new QUICNoError()); +} + +void +QUICIncomingFrameBuffer::clear() +{ + this->_out_of_order_queue.clear(); + + while (!this->_recv_buffer.empty()) { + this->_recv_buffer.pop(); + } + + this->_fin_offset = UINT64_MAX; + this->_max_offset = 0; + this->_recv_offset = 0; +} + +bool +QUICIncomingFrameBuffer::empty() +{ + return this->_out_of_order_queue.empty() && this->_recv_buffer.empty(); +} + +QUICErrorUPtr +QUICIncomingFrameBuffer::_check_and_set_fin_flag(QUICOffset offset, size_t len, bool fin_flag) +{ + // stream with fin flag {11.3. Stream Final Offset} + // Once a final offset for a stream is known, it cannot change. + // If a RST_STREAM or STREAM frame causes the final offset to change for a stream, + // an endpoint SHOULD respond with a FINAL_OFFSET_ERROR error (see Section 12). + // A receiver SHOULD treat receipt of data at or beyond the final offset as a + // FINAL_OFFSET_ERROR error, even after a stream is closed. + + // {11.3. Stream Final Offset} + // A receiver SHOULD treat receipt of data at or beyond the final offset as a + // FINAL_OFFSET_ERROR error, even after a stream is closed. + if (fin_flag) { + if (this->_fin_offset != UINT64_MAX) { + if (this->_fin_offset == offset) { + // dup fin frame + return QUICErrorUPtr(new QUICNoError()); + } + return QUICErrorUPtr(new QUICStreamError(this->_stream, QUICErrorClass::QUIC_TRANSPORT, QUICErrorCode::FINAL_OFFSET_ERROR)); + } + + this->_fin_offset = offset; + + if (this->_max_offset >= this->_fin_offset) { + return QUICErrorUPtr(new QUICStreamError(this->_stream, QUICErrorClass::QUIC_TRANSPORT, QUICErrorCode::FINAL_OFFSET_ERROR)); + } + + } else if (this->_fin_offset != UINT64_MAX && this->_fin_offset <= offset) { + return QUICErrorUPtr(new QUICStreamError(this->_stream, QUICErrorClass::QUIC_TRANSPORT, QUICErrorCode::FINAL_OFFSET_ERROR)); + } + + this->_max_offset = std::max(offset, this->_max_offset); + + return QUICErrorUPtr(new QUICNoError()); +} diff --git a/iocore/net/quic/QUICIncomingFrameBuffer.h b/iocore/net/quic/QUICIncomingFrameBuffer.h new file mode 100644 index 0000000..ab79a44 --- /dev/null +++ b/iocore/net/quic/QUICIncomingFrameBuffer.h @@ -0,0 +1,58 @@ +/** @file + * + * A brief file description + * + * @section license License + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <map> +#include <queue> + +#include "QUICTypes.h" +#include "QUICFrame.h" + +class QUICIncomingFrameBuffer +{ +public: + QUICIncomingFrameBuffer(QUICStream *stream) : _stream(stream) {} + + ~QUICIncomingFrameBuffer(); + + std::shared_ptr<const QUICStreamFrame> pop(); + + QUICErrorUPtr insert(const std::shared_ptr<const QUICStreamFrame>); + + void clear(); + + bool empty(); + +private: + QUICOffset _recv_offset = 0; + QUICOffset _max_offset = 0; + QUICOffset _fin_offset = UINT64_MAX; + + QUICErrorUPtr _check_and_set_fin_flag(QUICOffset offset, size_t len = 0, bool fin_flag = false); + + std::queue<std::shared_ptr<const QUICStreamFrame>> _recv_buffer; + std::map<QUICOffset, std::shared_ptr<const QUICStreamFrame>> _out_of_order_queue; + + QUICStream *_stream = nullptr; +}; diff --git a/iocore/net/quic/QUICStream.cc b/iocore/net/quic/QUICStream.cc index 7a7bbdc..f0ec0ff 100644 --- a/iocore/net/quic/QUICStream.cc +++ b/iocore/net/quic/QUICStream.cc @@ -273,25 +273,14 @@ QUICStream::_write_to_read_vio(const std::shared_ptr<const QUICStreamFrame> &fra int bytes_added = this->_read_vio.buffer.writer()->write(frame->data(), frame->data_length()); this->_read_vio.nbytes += bytes_added; - this->_recv_offset += frame->data_length(); - this->_local_flow_controller->forward_limit(this->_recv_offset + this->_flow_control_buffer_size); + // frame->offset() + frame->data_length() == this->_recv_offset + this->_local_flow_controller->forward_limit(frame->offset() + frame->data_length() + this->_flow_control_buffer_size); DebugQUICStreamFC("[LOCAL] %" PRIu64 "/%" PRIu64, this->_local_flow_controller->current_offset(), this->_local_flow_controller->current_limit()); this->_state.update_with_received_frame(*frame); } -void -QUICStream::_reorder_data() -{ - auto frame = _received_stream_frame_buffer.find(this->_recv_offset); - while (frame != this->_received_stream_frame_buffer.end()) { - this->_write_to_read_vio(frame->second); - this->_received_stream_frame_buffer.erase(frame); - frame = _received_stream_frame_buffer.find(this->_recv_offset); - } -} - /** * @brief Receive STREAM frame * @detail When receive STREAM frame, reorder frames and write to buffer of read_vio. @@ -317,17 +306,16 @@ QUICStream::recv(const std::shared_ptr<const QUICStreamFrame> frame) return error; } - // Reordering - Some frames may be delayed or be dropped - if (this->_recv_offset > frame->offset()) { - // Do nothing. Just ignore STREAM frame. - return QUICErrorUPtr(new QUICNoError()); - } else if (this->_recv_offset == frame->offset()) { - this->_write_to_read_vio(frame); - this->_reorder_data(); - } else { - // NOTE: push fragments in _received_stream_frame_buffer temporally. - // They will be reordered when missing data is filled and offset is matched. - this->_received_stream_frame_buffer.insert(std::make_pair(frame->offset(), frame)); + error = this->_received_stream_frame_buffer.insert(frame); + if (error->cls != QUICErrorClass::NONE) { + this->_received_stream_frame_buffer.clear(); + return error; + } + + auto new_frame = this->_received_stream_frame_buffer.pop(); + while (new_frame != nullptr) { + this->_write_to_read_vio(new_frame); + new_frame = this->_received_stream_frame_buffer.pop(); } return QUICErrorUPtr(new QUICNoError()); diff --git a/iocore/net/quic/QUICStream.h b/iocore/net/quic/QUICStream.h index b1d0765..282b434 100644 --- a/iocore/net/quic/QUICStream.h +++ b/iocore/net/quic/QUICStream.h @@ -31,6 +31,7 @@ #include "QUICFrame.h" #include "QUICStreamState.h" #include "QUICFlowController.h" +#include "QUICIncomingFrameBuffer.h" class QUICNetVConnection; class QUICFrameTransmitter; @@ -44,7 +45,7 @@ class QUICStreamManager; class QUICStream : public VConnection { public: - QUICStream() : VConnection(nullptr) {} + QUICStream() : VConnection(nullptr), _received_stream_frame_buffer(this) {} ~QUICStream() {} void init(QUICFrameTransmitter *tx, QUICConnectionId cid, QUICStreamId id, uint64_t recv_max_stream_data = 0, uint64_t send_max_stream_data = 0); @@ -82,7 +83,6 @@ private: QUICErrorUPtr _send(); void _write_to_read_vio(const std::shared_ptr<const QUICStreamFrame> &); - void _reorder_data(); // NOTE: Those are called update_read_request/update_write_request in Http2Stream // void _read_from_net(uint64_t read_len, bool direct); // void _write_to_net(IOBufferReader *buf_reader, int64_t write_len, bool direct); @@ -95,7 +95,6 @@ private: bool _fin = false; QUICConnectionId _connection_id = 0; QUICStreamId _id = 0; - QUICOffset _recv_offset = 0; QUICOffset _send_offset = 0; QUICRemoteStreamFlowController *_remote_flow_controller = nullptr; @@ -110,7 +109,7 @@ private: // Fragments of received STREAM frame (offset is unmatched) // TODO: Consider to replace with ts/RbTree.h or other data structure - std::map<QUICOffset, std::shared_ptr<const QUICStreamFrame>> _received_stream_frame_buffer; + QUICIncomingFrameBuffer _received_stream_frame_buffer; QUICStreamManager *_stream_manager = nullptr; QUICFrameTransmitter *_tx = nullptr; diff --git a/iocore/net/quic/test/Makefile.am b/iocore/net/quic/test/Makefile.am index 487e532..e25eced 100644 --- a/iocore/net/quic/test/Makefile.am +++ b/iocore/net/quic/test/Makefile.am @@ -31,7 +31,8 @@ check_PROGRAMS = \ test_QUICTypeUtil \ test_QUICAckFrameCreator \ test_QUICVersionNegotiator \ - test_QUICFlowController + test_QUICFlowController \ + test_QUICIncomingFrameBuffer AM_CPPFLAGS += \ @@ -220,6 +221,7 @@ test_QUICStream_SOURCES = \ event_processor_main.cc \ test_QUICStream.cc \ ../QUICStream.cc \ + ../QUICIncomingFrameBuffer.cc \ ../QUICFrameDispatcher.cc \ ../QUICStreamManager.cc \ ../QUICApplicationMap.cc \ @@ -489,6 +491,37 @@ test_QUICFlowController_SOURCES = \ $(QUICCrypto_impl) \ ../QUICFrame.cc +# +# test_QUICIncomingFrameBuffer +# +test_QUICIncomingFrameBuffer_CPPFLAGS = \ + $(AM_CPPFLAGS) + +test_QUICIncomingFrameBuffer_LDFLAGS = \ + @AM_LDFLAGS@ + +test_QUICIncomingFrameBuffer_SOURCES = \ + event_processor_main.cc \ + ../QUICStream.cc \ + ../QUICFrameDispatcher.cc \ + ../QUICStreamManager.cc \ + ../QUICApplicationMap.cc \ + ../QUICCongestionController.cc \ + ../../SSLNextProtocolSet.cc \ + ../QUICIncomingFrameBuffer.cc \ + test_QUICIncomingFrameBuffer.cc + +test_QUICIncomingFrameBuffer_LDADD = \ + $(top_builddir)/lib/ts/libtsutil.la \ + $(top_builddir)/iocore/eventsystem/libinkevent.a \ + $(top_builddir)/iocore/net/quic/libquic.a \ + $(top_builddir)/lib/records/librecords_p.a \ + $(top_builddir)/mgmt/libmgmt_p.la \ + $(top_builddir)/lib/ts/libtsutil.la \ + $(top_builddir)/proxy/shared/libUglyLogStubs.a \ + @LIBTCL@ \ + @HWLOC_LIBS@ + include $(top_srcdir)/build/tidy.mk tidy-local: $(DIST_SOURCES) diff --git a/iocore/net/quic/test/test_QUICIncomingFrameBuffer.cc b/iocore/net/quic/test/test_QUICIncomingFrameBuffer.cc new file mode 100644 index 0000000..844607a --- /dev/null +++ b/iocore/net/quic/test/test_QUICIncomingFrameBuffer.cc @@ -0,0 +1,177 @@ +/** @file + * + * A brief file description + * + * @section license License + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "catch.hpp" + +#include "quic/QUICIncomingFrameBuffer.h" +#include "quic/QUICStream.h" +#include <memory> + +TEST_CASE("QUICIncomingFrameBuffer_fin_offset", "[quic]") +{ + QUICStream *stream = new QUICStream(); + QUICIncomingFrameBuffer buffer(stream); + QUICErrorUPtr err = nullptr; + + uint8_t data[1024] = {0}; + + std::shared_ptr<QUICStreamFrame> stream1_frame_0_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 0); + std::shared_ptr<QUICStreamFrame> stream1_frame_1_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 1024); + std::shared_ptr<QUICStreamFrame> stream1_frame_2_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 2048, true); + std::shared_ptr<QUICStreamFrame> stream1_frame_3_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 3072, true); + std::shared_ptr<QUICStreamFrame> stream1_frame_4_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 4096); + + buffer.insert(stream1_frame_0_r); + buffer.insert(stream1_frame_1_r); + buffer.insert(stream1_frame_2_r); + err = buffer.insert(stream1_frame_3_r); + CHECK(err->code == QUICErrorCode::FINAL_OFFSET_ERROR); + + QUICIncomingFrameBuffer buffer2(stream); + + buffer2.insert(stream1_frame_3_r); + buffer2.insert(stream1_frame_0_r); + buffer2.insert(stream1_frame_1_r); + err = buffer2.insert(stream1_frame_2_r); + CHECK(err->code == QUICErrorCode::FINAL_OFFSET_ERROR); + + QUICIncomingFrameBuffer buffer3(stream); + + buffer3.insert(stream1_frame_4_r); + err = buffer3.insert(stream1_frame_3_r); + CHECK(err->code == QUICErrorCode::FINAL_OFFSET_ERROR); + + delete stream; +} + +TEST_CASE("QUICIncomingFrameBuffer_pop", "[quic]") +{ + QUICStream *stream = new QUICStream(); + QUICIncomingFrameBuffer buffer(stream); + QUICErrorUPtr err = nullptr; + + uint8_t data[1024] = {0}; + + std::shared_ptr<QUICStreamFrame> stream1_frame_0_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 0); + std::shared_ptr<QUICStreamFrame> stream1_frame_1_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 1024); + std::shared_ptr<QUICStreamFrame> stream1_frame_2_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 2048); + std::shared_ptr<QUICStreamFrame> stream1_frame_3_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 3072); + std::shared_ptr<QUICStreamFrame> stream1_frame_4_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 4096, true); + + buffer.insert(stream1_frame_0_r); + buffer.insert(stream1_frame_1_r); + buffer.insert(stream1_frame_2_r); + buffer.insert(stream1_frame_3_r); + buffer.insert(stream1_frame_4_r); + CHECK(!buffer.empty()); + + auto frame = buffer.pop(); + CHECK(frame->offset() == 0); + frame = buffer.pop(); + CHECK(frame->offset() == 1024); + frame = buffer.pop(); + CHECK(frame->offset() == 2048); + frame = buffer.pop(); + CHECK(frame->offset() == 3072); + frame = buffer.pop(); + CHECK(frame->offset() == 4096); + CHECK(buffer.empty()); + + buffer.clear(); + + buffer.insert(stream1_frame_4_r); + buffer.insert(stream1_frame_3_r); + buffer.insert(stream1_frame_2_r); + buffer.insert(stream1_frame_1_r); + buffer.insert(stream1_frame_0_r); + CHECK(!buffer.empty()); + + frame = buffer.pop(); + CHECK(frame->offset() == 0); + frame = buffer.pop(); + CHECK(frame->offset() == 1024); + frame = buffer.pop(); + CHECK(frame->offset() == 2048); + frame = buffer.pop(); + CHECK(frame->offset() == 3072); + frame = buffer.pop(); + CHECK(frame->offset() == 4096); + CHECK(buffer.empty()); + + delete stream; +} + +TEST_CASE("QUICIncomingFrameBuffer_dup_frame", "[quic]") +{ + QUICStream *stream = new QUICStream(); + QUICIncomingFrameBuffer buffer(stream); + QUICErrorUPtr err = nullptr; + + uint8_t data[1024] = {0}; + + std::shared_ptr<QUICStreamFrame> stream1_frame_0_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 0); + std::shared_ptr<QUICStreamFrame> stream1_frame_1_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 1024); + std::shared_ptr<QUICStreamFrame> stream1_frame_2_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 2048, true); + std::shared_ptr<QUICStreamFrame> stream1_frame_3_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 2048, true); + + buffer.insert(stream1_frame_0_r); + buffer.insert(stream1_frame_1_r); + buffer.insert(stream1_frame_2_r); + err = buffer.insert(stream1_frame_3_r); + CHECK(err->cls == QUICErrorClass::NONE); + + auto frame = buffer.pop(); + CHECK(frame->offset() == 0); + frame = buffer.pop(); + CHECK(frame->offset() == 1024); + frame = buffer.pop(); + CHECK(frame->offset() == 2048); + frame = buffer.pop(); + CHECK(frame == nullptr); + CHECK(buffer.empty()); + + buffer.clear(); + + std::shared_ptr<QUICStreamFrame> stream2_frame_0_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 0); + std::shared_ptr<QUICStreamFrame> stream2_frame_1_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 1024); + std::shared_ptr<QUICStreamFrame> stream2_frame_2_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 1024); + std::shared_ptr<QUICStreamFrame> stream2_frame_3_r = QUICFrameFactory::create_stream_frame(data, 1024, 1, 2048, true); + + buffer.insert(stream2_frame_0_r); + buffer.insert(stream2_frame_1_r); + buffer.insert(stream2_frame_2_r); + err = buffer.insert(stream2_frame_3_r); + CHECK(err->cls == QUICErrorClass::NONE); + + frame = buffer.pop(); + CHECK(frame->offset() == 0); + frame = buffer.pop(); + CHECK(frame->offset() == 1024); + frame = buffer.pop(); + CHECK(frame->offset() == 2048); + frame = buffer.pop(); + CHECK(frame == nullptr); + CHECK(buffer.empty()); + + delete stream; +} -- To stop receiving notification emails like this one, please contact "commits@trafficserver.apache.org" <commits@trafficserver.apache.org>.