test/TileQueueTests.cpp | 93 ++++++++++++++++++++++++++++++++++++++++++++++++ wsd/SenderQueue.hpp | 37 ++++++++++++++++++- wsd/TileDesc.hpp | 13 ++++++ 3 files changed, 142 insertions(+), 1 deletion(-)
New commits: commit ea2c3eb15c6e7460559427875422cc069abbc25b Author: Ashod Nakashian <[email protected]> Date: Sun Dec 18 12:04:18 2016 -0500 wsd: deduplicate tile messages in SenderQueue Change-Id: Ib1cc38f34534aa754503ef296871815bc3d5450d Reviewed-on: https://gerrit.libreoffice.org/32158 Reviewed-by: Ashod Nakashian <[email protected]> Tested-by: Ashod Nakashian <[email protected]> diff --git a/test/TileQueueTests.cpp b/test/TileQueueTests.cpp index 868d78e..7df94ef 100644 --- a/test/TileQueueTests.cpp +++ b/test/TileQueueTests.cpp @@ -14,6 +14,7 @@ #include "Common.hpp" #include "Protocol.hpp" #include "MessageQueue.hpp" +#include "SenderQueue.hpp" #include "Util.hpp" namespace CPPUNIT_NS @@ -44,6 +45,8 @@ class TileQueueTests : public CPPUNIT_NS::TestFixture CPPUNIT_TEST(testTileRecombining); CPPUNIT_TEST(testViewOrder); CPPUNIT_TEST(testPreviewsDeprioritization); + CPPUNIT_TEST(testSenderQueue); + CPPUNIT_TEST(testSenderQueueTileDeduplication); CPPUNIT_TEST_SUITE_END(); @@ -52,6 +55,8 @@ class TileQueueTests : public CPPUNIT_NS::TestFixture void testTileRecombining(); void testViewOrder(); void testPreviewsDeprioritization(); + void testSenderQueue(); + void testSenderQueueTileDeduplication(); }; void TileQueueTests::testTileQueuePriority() @@ -259,6 +264,94 @@ void TileQueueTests::testPreviewsDeprioritization() CPPUNIT_ASSERT_EQUAL(0, static_cast<int>(queue._queue.size())); } +void TileQueueTests::testSenderQueue() +{ + SenderQueue<std::shared_ptr<MessagePayload>> queue; + + std::shared_ptr<MessagePayload> item; + + // Empty queue + CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10)); + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); + + const std::vector<std::string> messages = + { + "message 1", + "message 2", + "message 3" + }; + + for (const auto& msg : messages) + { + queue.enqueue(std::make_shared<MessagePayload>(msg)); + } + + CPPUNIT_ASSERT_EQUAL(3UL, queue.size()); + + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0)); + CPPUNIT_ASSERT_EQUAL(2UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(messages[0], std::string(item->data().data(), item->data().size())); + + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0)); + CPPUNIT_ASSERT_EQUAL(1UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(messages[1], std::string(item->data().data(), item->data().size())); + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 0)); + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(messages[2], std::string(item->data().data(), item->data().size())); + + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); +} + +void TileQueueTests::testSenderQueueTileDeduplication() +{ + SenderQueue<std::shared_ptr<MessagePayload>> queue; + + std::shared_ptr<MessagePayload> item; + + // Empty queue + CPPUNIT_ASSERT_EQUAL(false, queue.waitDequeue(item, 10)); + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); + + const std::vector<std::string> part_messages = + { + "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=0", + "tile: part=1 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1", + "tile: part=2 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=-1" + }; + + for (const auto& msg : part_messages) + { + queue.enqueue(std::make_shared<MessagePayload>(msg)); + } + + CPPUNIT_ASSERT_EQUAL(3UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10)); + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10)); + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10)); + + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); + + const std::vector<std::string> dup_messages = + { + "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=-1", + "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1", + "tile: part=0 width=180 height=135 tileposx=0 tileposy=0 tilewidth=15875 tileheight=11906 ver=1" + }; + + for (const auto& msg : dup_messages) + { + queue.enqueue(std::make_shared<MessagePayload>(msg)); + } + + CPPUNIT_ASSERT_EQUAL(1UL, queue.size()); + CPPUNIT_ASSERT_EQUAL(true, queue.waitDequeue(item, 10)); + + // The last one should persist. + CPPUNIT_ASSERT_EQUAL(dup_messages[2], std::string(item->data().data(), item->data().size())); + + CPPUNIT_ASSERT_EQUAL(0UL, queue.size()); +} + CPPUNIT_TEST_SUITE_REGISTRATION(TileQueueTests); /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp index eb8ae8d..912d66a 100644 --- a/wsd/SenderQueue.hpp +++ b/wsd/SenderQueue.hpp @@ -19,6 +19,7 @@ #include "common/SigUtil.hpp" #include "LOOLWebSocket.hpp" #include "Log.hpp" +#include "TileDesc.hpp" /// The payload type used to send/receive data. class MessagePayload @@ -115,7 +116,10 @@ public: std::unique_lock<std::mutex> lock(_mutex); if (!stopping()) { - _queue.push_back(item); + if (deduplicate(item)) + { + _queue.push_back(item); + } } const size_t queuesize = _queue.size(); @@ -156,9 +160,40 @@ public: } private: + /// Deduplicate messages based on the new one. + /// Returns true if the new message should be + /// enqueued, otherwise false. + bool deduplicate(const Item& item) + { + const std::string line = LOOLProtocol::getFirstLine(item->data()); + const std::string command = LOOLProtocol::getFirstToken(line); + if (command == "tile:") + { + TileDesc newTile = TileDesc::parse(line); + auto begin = std::remove_if(_queue.begin(), _queue.end(), + [&newTile](const queue_item_t& cur) + { + const std::string curLine = LOOLProtocol::getFirstLine(cur->data()); + const std::string curCommand = LOOLProtocol::getFirstToken(curLine); + if (curCommand == "tile:") + { + return (newTile == TileDesc::parse(curLine)); + } + + return false; + }); + + _queue.erase(begin, _queue.end()); + } + + return true; + } + +private: mutable std::mutex _mutex; std::condition_variable _cv; std::deque<Item> _queue; + typedef typename std::deque<Item>::value_type queue_item_t; std::atomic<bool> _stop; }; diff --git a/wsd/TileDesc.hpp b/wsd/TileDesc.hpp index 765341d..9721deb 100644 --- a/wsd/TileDesc.hpp +++ b/wsd/TileDesc.hpp @@ -65,6 +65,19 @@ public: int getId() const { return _id; } bool getBroadcast() const { return _broadcast; } + bool operator==(const TileDesc& other) const + { + return _part == other._part && + _width == other._width && + _height == other._height && + _tilePosX == other._tilePosX && + _tilePosY == other._tilePosY && + _tileWidth == other._tileWidth && + _tileHeight == other._tileHeight && + _id == other._id && + _broadcast == other._broadcast; + } + bool intersectsWithRect(int x, int y, int w, int h) const { return x + w >= getTilePosX() && _______________________________________________ Libreoffice-commits mailing list [email protected] https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits
