diff --git a/CMakeLists.txt b/CMakeLists.txt index c871546af90c5f38ef7074df827a1595ed8a7b01..c7656ec7d0d907f108fc257702f320c557a2c2ee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -192,6 +192,7 @@ add_library(beyond-protocol STATIC src/streams/broadcaster.cpp src/streams/netstream.cpp src/streams/filestream.cpp + src/streams/packetmanager.cpp src/node.cpp src/self.cpp src/protocol.cpp diff --git a/include/ftl/protocol/packet.hpp b/include/ftl/protocol/packet.hpp index 485ed85338a3505432daaa4b49d1ff75e6a3f2e0..0227c7180af362d868e834834a5a35cd67601b4b 100644 --- a/include/ftl/protocol/packet.hpp +++ b/include/ftl/protocol/packet.hpp @@ -10,6 +10,7 @@ #include <vector> #include <string> #include <tuple> +#include <utility> #include <ftl/protocol/codecs.hpp> #include <ftl/protocol/channels.hpp> @@ -101,7 +102,7 @@ struct StreamPacket { inline size_t frameSetID() const { return (version >= 4) ? streamID : 0; } int64_t localTimestamp; // Not message packet / saved - unsigned int hint_capability; // Is this a video stream, for example + mutable unsigned int hint_capability; // Is this a video stream, for example size_t hint_source_total; // Number of tracks per frame to expect int retry_count = 0; // Decode retry count unsigned int hint_peerid = 0; @@ -111,5 +112,7 @@ struct StreamPacket { struct Packet : public StreamPacket, public DataPacket {}; +using PacketPair = std::pair<StreamPacket, DataPacket>; + } // namespace protocol } // namespace ftl diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index 91e9ac9c607df3e8af2581978df96bf0329d4b41..c411bbe4f74cac983ccafdc221b3616de5a9a466 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -204,7 +204,9 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket if (!active_) return; - StreamPacket spkt = spkt_raw; + ftl::protocol::PacketPair pair; + StreamPacket &spkt = pair.first; + spkt = spkt_raw; spkt.localTimestamp = now - int64_t(ttimeoff); spkt.hint_capability = 0; spkt.hint_source_total = 0; @@ -260,8 +262,14 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket _processRequest(p, &spkt, pkt); } - trigger(spkt, pkt); - if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp); + pair.second = std::move(pkt); + mgr_.submit(pair, [this, now, ttimeoff, p](const ftl::protocol::PacketPair &pair) { + const StreamPacket &spkt = pair.first; + const DataPacket &pkt = pair.second; + + trigger(spkt, pkt); + if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp); + }); } void Net::inject(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { @@ -283,8 +291,9 @@ bool Net::begin() { net_->bind(base_uri_, [this]( ftl::net::Peer &p, int16_t ttimeoff, - const StreamPacketMSGPACK &spkt_raw, - const PacketMSGPACK &pkt) { + StreamPacketMSGPACK &spkt_raw, + PacketMSGPACK &pkt) { + _processPacket(&p, ttimeoff, spkt_raw, pkt); }); @@ -439,7 +448,7 @@ void Net::_cleanUp() { * batches (max 255 unique frames by timestamp). Requests are in the form * of packets that match the request except the data component is empty. */ -bool Net::_processRequest(ftl::net::Peer *p, StreamPacket *spkt, const DataPacket &pkt) { +bool Net::_processRequest(ftl::net::Peer *p, const StreamPacket *spkt, const DataPacket &pkt) { bool found = false; if (spkt->streamID == 255 || spkt->frame_number == 255) { diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index eeedaaa659378ac837b595e8b4171b7d43812556..6029af801b3c8550ff20aa3ae9f10e8a1d9832eb 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -14,6 +14,7 @@ #include <ftl/protocol/packet.hpp> #include <ftl/protocol/streams.hpp> #include <ftl/handle.hpp> +#include "packetmanager.hpp" namespace ftl { namespace protocol { @@ -108,7 +109,7 @@ class Net : public Stream { bool paused_ = false; int frames_to_request_ = kFramesToRequest; std::string name_; - + ftl::PacketManager mgr_; ftl::Handler<ftl::net::Peer*> connect_cb_; uint32_t local_fsid_ = 0; @@ -123,7 +124,7 @@ class Net : public Stream { std::unordered_map<ftl::protocol::FrameID, std::list<detail::StreamClient>> clients_; bool _enable(FrameID id); - bool _processRequest(ftl::net::Peer *p, ftl::protocol::StreamPacket *spkt, const ftl::protocol::DataPacket &pkt); + bool _processRequest(ftl::net::Peer *p, const ftl::protocol::StreamPacket *spkt, const ftl::protocol::DataPacket &pkt); void _checkRXRate(size_t rx_size, int64_t rx_latency, int64_t ts); void _checkTXRate(size_t tx_size, int64_t tx_latency, int64_t ts); bool _sendRequest( diff --git a/src/streams/packetmanager.cpp b/src/streams/packetmanager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..139cc942e09e67e6554e88c26ae463c0ea2d76bf --- /dev/null +++ b/src/streams/packetmanager.cpp @@ -0,0 +1,127 @@ +/** + * @file packetmanager.cpp + * @copyright Copyright (c) 2022 University of Turku, MIT License + * @author Nicolas Pope + */ + +#include <utility> +#include <vector> +#include <limits> +#include "packetmanager.hpp" +#include <ftl/protocol/frameid.hpp> + +#include <loguru.hpp> + +using ftl::PacketManager; +using ftl::StreamState; +using ftl::protocol::PacketPair; +using ftl::protocol::FrameID; +using ftl::protocol::Channel; + +StreamState &PacketManager::getState(FrameID id) { + { + SHARED_LOCK(mtx_, lk); + auto it = state_.find(id.id); + if (it != state_.end()) return it->second; + } + UNIQUE_LOCK(mtx_, lk); + return state_[id.id]; +} + +void PacketManager::submit(PacketPair &packets, const std::function<void(const PacketPair &)> &cb, bool noLoop) { + auto &state = getState(FrameID(packets.first.frameSetID(), packets.first.frameNumber())); + + if (state.timestamp == -1) { + state.timestamp = packets.first.timestamp; + } + + if (state.timestamp == packets.first.timestamp) { + if (packets.first.channel == Channel::kEndFrame) { + state.expected = packets.second.packet_count; + } + cb(packets); + ++state.processed; + + if (state.processed == state.expected) { + UNIQUE_LOCK(state.mtx, lk); + if (state.processed == state.expected) { + state.processed = 0; + state.expected = -1; + + if (state.writePos > state.readPos) { + size_t start = state.readPos; + size_t stop = state.writePos; + state.readPos = stop; + + state.timestamp = std::numeric_limits<int64_t>::max(); + for (size_t i = start; i < stop; ++i) { + state.timestamp = std::min( + state.timestamp, + state.buffer[i % StreamState::kMaxBuffer].first.timestamp); + } + + lk.unlock(); + // Loop over the buffer, checking for anything that can be processed + for (size_t i = start; i < stop; ++i) { + if (state.buffer[i].first.channel == Channel::kEndFrame) { + --state.bufferedEndFrames; + } + submit(state.buffer[i], cb, true); + } + } else { + state.timestamp = -1; + return; + } + } + } + } else if (state.timestamp > packets.first.timestamp) { + LOG(WARNING) << "Old packet received"; + } else { + DLOG(WARNING) << "Buffer packets: " << packets.first.timestamp; + // Change the current frame + UNIQUE_LOCK(state.mtx, lk); + if (state.timestamp == packets.first.timestamp) { + lk.unlock(); + submit(packets, cb); + return; + } + + // Add packet to buffer; + auto wpos = state.writePos++; + state.buffer[wpos % StreamState::kMaxBuffer] = std::move(packets); + lk.unlock(); + + if (packets.first.channel == Channel::kEndFrame) { + ++state.bufferedEndFrames; + } + + if (state.bufferedEndFrames > 4) { + LOG(WARNING) << "Discarding incomplete frame: " << state.timestamp; + UNIQUE_LOCK(state.mtx, lk); + state.processed = 0; + state.expected = -1; + + size_t start = state.readPos; + size_t stop = state.writePos; + state.readPos = stop; + + state.timestamp = std::numeric_limits<int64_t>::max(); + for (size_t i = start; i < stop; ++i) { + state.timestamp = std::min( + state.timestamp, + state.buffer[i % StreamState::kMaxBuffer].first.timestamp); + } + + lk.unlock(); + // Loop over the buffer, checking for anything that can be processed + for (size_t i = start; i < stop; ++i) { + if (state.buffer[i].first.channel == Channel::kEndFrame) { + --state.bufferedEndFrames; + } + submit(state.buffer[i], cb, true); + std::vector<uint8_t> temp; + state.buffer[i].second.data.swap(temp); + } + } + } +} diff --git a/src/streams/packetmanager.hpp b/src/streams/packetmanager.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e962bf037c42270f76800cdfa3d832801168f44e --- /dev/null +++ b/src/streams/packetmanager.hpp @@ -0,0 +1,53 @@ +/** + * @file packetmanager.hpp + * @copyright Copyright (c) 2022 University of Turku, MIT License + * @author Nicolas Pope + */ + +#pragma once + +#include <functional> +#include <list> +#include <atomic> +#include <unordered_map> +#include <memory> +#include <tuple> +#include <array> +#include <ftl/protocol/packet.hpp> +#include <ftl/threads.hpp> +#include <ftl/protocol/frameid.hpp> + +namespace ftl { + +struct StreamState { + static constexpr int kMaxBuffer = 100; + + MUTEX mtx; + std::array<ftl::protocol::PacketPair, kMaxBuffer> buffer; + + int64_t timestamp = -1; + int64_t minTimestamp = -1; + int expected = -1; + std::atomic_int processed = 0; + size_t readPos = 0; + size_t writePos = 0; + std::atomic_int bufferedEndFrames = 0; +}; + +class PacketManager { + public: + void submit( + ftl::protocol::PacketPair &, + const std::function<void(const ftl::protocol::PacketPair &)> &, + bool noLoop = false); + + void reset(); + + private: + SHARED_MUTEX mtx_; + std::unordered_map<uint32_t, StreamState> state_; + + StreamState &getState(ftl::protocol::FrameID); +}; + +} // namespace ftl diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e6f9fe36108dc2b4e2b2b566762e66cce4659ab1..3e2f643d7cef882d5e34963330515dd441027d00 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -162,3 +162,14 @@ target_link_libraries(datacodec_unit beyond-protocol ${URIPARSER_LIBRARIES}) add_test(DataCodecTest datacodec_unit) + +### Packet Manager ############################################################# +add_executable(packetmanager_unit + $<TARGET_OBJECTS:CatchTestFTL> + ./packetmanager_unit.cpp) +target_include_directories(packetmanager_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(packetmanager_unit beyond-protocol + Threads::Threads ${OS_LIBS} + ${URIPARSER_LIBRARIES}) + +add_test(PacketManagerTest packetmanager_unit) diff --git a/test/packetmanager_unit.cpp b/test/packetmanager_unit.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d073c759371795ae7a7ad53889ce61dae37c6f5a --- /dev/null +++ b/test/packetmanager_unit.cpp @@ -0,0 +1,182 @@ +#include "catch.hpp" +#include "../src/streams/packetmanager.hpp" + +using ftl::PacketManager; +using ftl::protocol::Channel; +using ftl::protocol::StreamPacket; +using ftl::protocol::DataPacket; +using ftl::protocol::PacketPair; + +static ftl::protocol::PacketPair makePair(int64_t ts, Channel c) { + StreamPacket spkt; + spkt.timestamp = ts; + spkt.streamID = 0; + spkt.channel = c; + spkt.frame_number = 0; + + DataPacket pkt; + return {spkt, pkt}; +} + +TEST_CASE( "PacketManager multiple in-order frames" ) { + PacketManager mgr; + + int count = 0; + + PacketPair p; + p = makePair(100, Channel::kColour); + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + p = makePair(100, Channel::kPose); + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + p = makePair(100, Channel::kEndFrame); + p.second.packet_count = 3; + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + p = makePair(101, Channel::kPose); + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + REQUIRE(count == 4); +} + +TEST_CASE( "PacketManager out-of-order frames" ) { + PacketManager mgr; + + int count = 0; + + PacketPair p; + p = makePair(200, Channel::kColour); + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + p = makePair(201, Channel::kPose); + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + REQUIRE(count == 1); + + p = makePair(200, Channel::kEndFrame); + p.second.packet_count = 2; + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + REQUIRE(count == 3); +} + +TEST_CASE( "PacketManager many out-of-order frames" ) { + PacketManager mgr; + + std::vector<int64_t> times; + + PacketPair p; + p = makePair(300, Channel::kColour); + mgr.submit(p, [×](const PacketPair &pp) { + times.push_back(pp.first.timestamp); + }); + + p = makePair(301, Channel::kPose); + mgr.submit(p, [×](const PacketPair &pp) { + times.push_back(pp.first.timestamp); + }); + + p = makePair(302, Channel::kPose); + mgr.submit(p, [×](const PacketPair &pp) { + times.push_back(pp.first.timestamp); + }); + + p = makePair(301, Channel::kDepth); + mgr.submit(p, [×](const PacketPair &pp) { + times.push_back(pp.first.timestamp); + }); + + REQUIRE(times.size() == 1); + + p = makePair(300, Channel::kEndFrame); + p.second.packet_count = 2; + mgr.submit(p, [×](const PacketPair &pp) { + times.push_back(pp.first.timestamp); + }); + + REQUIRE(times.size() == 4); + + p = makePair(301, Channel::kEndFrame); + p.second.packet_count = 3; + mgr.submit(p, [×](const PacketPair &pp) { + times.push_back(pp.first.timestamp); + }); + + REQUIRE(times.size() == 6); + + REQUIRE(times[0] == 300); + REQUIRE(times[1] == 300); + REQUIRE(times[2] == 301); + REQUIRE(times[3] == 301); + REQUIRE(times[4] == 301); + REQUIRE(times[5] == 302); +} + +TEST_CASE( "Incomplete frames" ) { + PacketManager mgr; + + int count = 0; + + PacketPair p; + + p = makePair(400, Channel::kEndFrame); + p.second.packet_count = 2; + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + p = makePair(401, Channel::kEndFrame); + p.second.packet_count = 1; + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + p = makePair(402, Channel::kEndFrame); + p.second.packet_count = 1; + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + REQUIRE(count == 1); + + p = makePair(403, Channel::kEndFrame); + p.second.packet_count = 1; + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + p = makePair(404, Channel::kEndFrame); + p.second.packet_count = 1; + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + p = makePair(405, Channel::kEndFrame); + p.second.packet_count = 1; + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + p = makePair(406, Channel::kEndFrame); + p.second.packet_count = 1; + mgr.submit(p, [&count](const PacketPair &pp) { + ++count; + }); + + REQUIRE(count == 7); +} diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp index 161b5669509b6691fcef63c91fe290536e478536..259e068ab3e015533e9ccff6499b3195c3bf00db 100644 --- a/test/stream_integration.cpp +++ b/test/stream_integration.cpp @@ -127,7 +127,7 @@ TEST_CASE("TCP Stream", "[net]") { ftl::protocol::DataPacket pkt; pkt.bitrate = 10; pkt.codec = ftl::protocol::Codec::kJPG; - pkt.frame_count = 1; + pkt.packet_count = 1; for (int i=0; i<30 + 20; ++i) { spkt.timestamp = i; @@ -142,6 +142,74 @@ TEST_CASE("TCP Stream", "[net]") { REQUIRE( rcount == 30 ); } + SECTION("handles out-of-order packets") { + MUTEX mtx; + std::vector<int64_t> times; + times.reserve(24); + + auto s1 = ftl::createStream("ftl://mystream"); + REQUIRE( s1 ); + + auto s2 = self->getStream("ftl://mystream"); + REQUIRE( s2 ); + + auto h = s2->onPacket([&mtx, ×](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { + UNIQUE_LOCK(mtx, lk); + times.push_back(spkt.timestamp); + return true; + }); + + s1->begin(); + s2->begin(); + + REQUIRE(s1->active(FrameID(0, 0)) == false); + + s2->enable(FrameID(0, 0)); + + // TODO: Find better option + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + REQUIRE(s1->active(FrameID(0, 0)) == true); + + ftl::protocol::StreamPacket spkt; + spkt.streamID = 0; + spkt.frame_number = 0; + spkt.channel = ftl::protocol::Channel::kEndFrame; + ftl::protocol::DataPacket pkt; + pkt.bitrate = 10; + pkt.codec = ftl::protocol::Codec::kJPG; + pkt.packet_count = 1; + + spkt.timestamp = 100; + pkt.packet_count = 2; + s1->post(spkt, pkt); + pkt.packet_count = 1; + spkt.timestamp = 120; + s1->post(spkt, pkt); + spkt.timestamp = 110; + pkt.packet_count = 21; + s1->post(spkt, pkt); + spkt.channel = ftl::protocol::Channel::kColour; + for (int i=0; i < 20; ++i) { + spkt.timestamp = 110; + s1->post(spkt, pkt); + } + spkt.timestamp = 100; + s1->post(spkt, pkt); + + // TODO: Find better option + int k = 15; + while (--k > 0 && times.size() < 24) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + REQUIRE( times.size() == 24 ); + + REQUIRE(times[0] == 100); + REQUIRE(times[1] == 100); + REQUIRE(times[2] == 110); + REQUIRE(times[23] == 120); + } + p.reset(); ftl::protocol::reset(); }