Skip to content
Snippets Groups Projects
Commit 64e78b06 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

#67 Fix packet ordering problem

parent e6cf226c
No related branches found
No related tags found
No related merge requests found
......@@ -192,6 +192,7 @@ add_library(beyond-protocol STATIC
src/streams/broadcaster.cpp
src/streams/netstream.cpp
src/streams/filestream.cpp
src/streams/packetmanager.cpp
src/node.cpp
src/self.cpp
src/protocol.cpp
......
......@@ -10,6 +10,7 @@
#include <vector>
#include <string>
#include <tuple>
#include <utility>
#include <ftl/protocol/codecs.hpp>
#include <ftl/protocol/channels.hpp>
......@@ -101,7 +102,7 @@ struct StreamPacket {
inline size_t frameSetID() const { return (version >= 4) ? streamID : 0; }
int64_t localTimestamp; // Not message packet / saved
unsigned int hint_capability; // Is this a video stream, for example
mutable unsigned int hint_capability; // Is this a video stream, for example
size_t hint_source_total; // Number of tracks per frame to expect
int retry_count = 0; // Decode retry count
unsigned int hint_peerid = 0;
......@@ -111,5 +112,7 @@ struct StreamPacket {
struct Packet : public StreamPacket, public DataPacket {};
using PacketPair = std::pair<StreamPacket, DataPacket>;
} // namespace protocol
} // namespace ftl
......@@ -204,7 +204,9 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
if (!active_) return;
StreamPacket spkt = spkt_raw;
ftl::protocol::PacketPair pair;
StreamPacket &spkt = pair.first;
spkt = spkt_raw;
spkt.localTimestamp = now - int64_t(ttimeoff);
spkt.hint_capability = 0;
spkt.hint_source_total = 0;
......@@ -260,8 +262,14 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
_processRequest(p, &spkt, pkt);
}
trigger(spkt, pkt);
if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
pair.second = std::move(pkt);
mgr_.submit(pair, [this, now, ttimeoff, p](const ftl::protocol::PacketPair &pair) {
const StreamPacket &spkt = pair.first;
const DataPacket &pkt = pair.second;
trigger(spkt, pkt);
if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
});
}
void Net::inject(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) {
......@@ -283,8 +291,9 @@ bool Net::begin() {
net_->bind(base_uri_, [this](
ftl::net::Peer &p,
int16_t ttimeoff,
const StreamPacketMSGPACK &spkt_raw,
const PacketMSGPACK &pkt) {
StreamPacketMSGPACK &spkt_raw,
PacketMSGPACK &pkt) {
_processPacket(&p, ttimeoff, spkt_raw, pkt);
});
......@@ -439,7 +448,7 @@ void Net::_cleanUp() {
* batches (max 255 unique frames by timestamp). Requests are in the form
* of packets that match the request except the data component is empty.
*/
bool Net::_processRequest(ftl::net::Peer *p, StreamPacket *spkt, const DataPacket &pkt) {
bool Net::_processRequest(ftl::net::Peer *p, const StreamPacket *spkt, const DataPacket &pkt) {
bool found = false;
if (spkt->streamID == 255 || spkt->frame_number == 255) {
......
......@@ -14,6 +14,7 @@
#include <ftl/protocol/packet.hpp>
#include <ftl/protocol/streams.hpp>
#include <ftl/handle.hpp>
#include "packetmanager.hpp"
namespace ftl {
namespace protocol {
......@@ -108,7 +109,7 @@ class Net : public Stream {
bool paused_ = false;
int frames_to_request_ = kFramesToRequest;
std::string name_;
ftl::PacketManager mgr_;
ftl::Handler<ftl::net::Peer*> connect_cb_;
uint32_t local_fsid_ = 0;
......@@ -123,7 +124,7 @@ class Net : public Stream {
std::unordered_map<ftl::protocol::FrameID, std::list<detail::StreamClient>> clients_;
bool _enable(FrameID id);
bool _processRequest(ftl::net::Peer *p, ftl::protocol::StreamPacket *spkt, const ftl::protocol::DataPacket &pkt);
bool _processRequest(ftl::net::Peer *p, const ftl::protocol::StreamPacket *spkt, const ftl::protocol::DataPacket &pkt);
void _checkRXRate(size_t rx_size, int64_t rx_latency, int64_t ts);
void _checkTXRate(size_t tx_size, int64_t tx_latency, int64_t ts);
bool _sendRequest(
......
/**
* @file packetmanager.cpp
* @copyright Copyright (c) 2022 University of Turku, MIT License
* @author Nicolas Pope
*/
#include <utility>
#include <vector>
#include <limits>
#include "packetmanager.hpp"
#include <ftl/protocol/frameid.hpp>
#include <loguru.hpp>
using ftl::PacketManager;
using ftl::StreamState;
using ftl::protocol::PacketPair;
using ftl::protocol::FrameID;
using ftl::protocol::Channel;
StreamState &PacketManager::getState(FrameID id) {
{
SHARED_LOCK(mtx_, lk);
auto it = state_.find(id.id);
if (it != state_.end()) return it->second;
}
UNIQUE_LOCK(mtx_, lk);
return state_[id.id];
}
void PacketManager::submit(PacketPair &packets, const std::function<void(const PacketPair &)> &cb, bool noLoop) {
auto &state = getState(FrameID(packets.first.frameSetID(), packets.first.frameNumber()));
if (state.timestamp == -1) {
state.timestamp = packets.first.timestamp;
}
if (state.timestamp == packets.first.timestamp) {
if (packets.first.channel == Channel::kEndFrame) {
state.expected = packets.second.packet_count;
}
cb(packets);
++state.processed;
if (state.processed == state.expected) {
UNIQUE_LOCK(state.mtx, lk);
if (state.processed == state.expected) {
state.processed = 0;
state.expected = -1;
if (state.writePos > state.readPos) {
size_t start = state.readPos;
size_t stop = state.writePos;
state.readPos = stop;
state.timestamp = std::numeric_limits<int64_t>::max();
for (size_t i = start; i < stop; ++i) {
state.timestamp = std::min(
state.timestamp,
state.buffer[i % StreamState::kMaxBuffer].first.timestamp);
}
lk.unlock();
// Loop over the buffer, checking for anything that can be processed
for (size_t i = start; i < stop; ++i) {
if (state.buffer[i].first.channel == Channel::kEndFrame) {
--state.bufferedEndFrames;
}
submit(state.buffer[i], cb, true);
}
} else {
state.timestamp = -1;
return;
}
}
}
} else if (state.timestamp > packets.first.timestamp) {
LOG(WARNING) << "Old packet received";
} else {
DLOG(WARNING) << "Buffer packets: " << packets.first.timestamp;
// Change the current frame
UNIQUE_LOCK(state.mtx, lk);
if (state.timestamp == packets.first.timestamp) {
lk.unlock();
submit(packets, cb);
return;
}
// Add packet to buffer;
auto wpos = state.writePos++;
state.buffer[wpos % StreamState::kMaxBuffer] = std::move(packets);
lk.unlock();
if (packets.first.channel == Channel::kEndFrame) {
++state.bufferedEndFrames;
}
if (state.bufferedEndFrames > 4) {
LOG(WARNING) << "Discarding incomplete frame: " << state.timestamp;
UNIQUE_LOCK(state.mtx, lk);
state.processed = 0;
state.expected = -1;
size_t start = state.readPos;
size_t stop = state.writePos;
state.readPos = stop;
state.timestamp = std::numeric_limits<int64_t>::max();
for (size_t i = start; i < stop; ++i) {
state.timestamp = std::min(
state.timestamp,
state.buffer[i % StreamState::kMaxBuffer].first.timestamp);
}
lk.unlock();
// Loop over the buffer, checking for anything that can be processed
for (size_t i = start; i < stop; ++i) {
if (state.buffer[i].first.channel == Channel::kEndFrame) {
--state.bufferedEndFrames;
}
submit(state.buffer[i], cb, true);
std::vector<uint8_t> temp;
state.buffer[i].second.data.swap(temp);
}
}
}
}
/**
* @file packetmanager.hpp
* @copyright Copyright (c) 2022 University of Turku, MIT License
* @author Nicolas Pope
*/
#pragma once
#include <functional>
#include <list>
#include <atomic>
#include <unordered_map>
#include <memory>
#include <tuple>
#include <array>
#include <ftl/protocol/packet.hpp>
#include <ftl/threads.hpp>
#include <ftl/protocol/frameid.hpp>
namespace ftl {
struct StreamState {
static constexpr int kMaxBuffer = 100;
MUTEX mtx;
std::array<ftl::protocol::PacketPair, kMaxBuffer> buffer;
int64_t timestamp = -1;
int64_t minTimestamp = -1;
int expected = -1;
std::atomic_int processed = 0;
size_t readPos = 0;
size_t writePos = 0;
std::atomic_int bufferedEndFrames = 0;
};
class PacketManager {
public:
void submit(
ftl::protocol::PacketPair &,
const std::function<void(const ftl::protocol::PacketPair &)> &,
bool noLoop = false);
void reset();
private:
SHARED_MUTEX mtx_;
std::unordered_map<uint32_t, StreamState> state_;
StreamState &getState(ftl::protocol::FrameID);
};
} // namespace ftl
......@@ -162,3 +162,14 @@ target_link_libraries(datacodec_unit beyond-protocol
${URIPARSER_LIBRARIES})
add_test(DataCodecTest datacodec_unit)
### Packet Manager #############################################################
add_executable(packetmanager_unit
$<TARGET_OBJECTS:CatchTestFTL>
./packetmanager_unit.cpp)
target_include_directories(packetmanager_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include")
target_link_libraries(packetmanager_unit beyond-protocol
Threads::Threads ${OS_LIBS}
${URIPARSER_LIBRARIES})
add_test(PacketManagerTest packetmanager_unit)
#include "catch.hpp"
#include "../src/streams/packetmanager.hpp"
using ftl::PacketManager;
using ftl::protocol::Channel;
using ftl::protocol::StreamPacket;
using ftl::protocol::DataPacket;
using ftl::protocol::PacketPair;
static ftl::protocol::PacketPair makePair(int64_t ts, Channel c) {
StreamPacket spkt;
spkt.timestamp = ts;
spkt.streamID = 0;
spkt.channel = c;
spkt.frame_number = 0;
DataPacket pkt;
return {spkt, pkt};
}
TEST_CASE( "PacketManager multiple in-order frames" ) {
PacketManager mgr;
int count = 0;
PacketPair p;
p = makePair(100, Channel::kColour);
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
p = makePair(100, Channel::kPose);
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
p = makePair(100, Channel::kEndFrame);
p.second.packet_count = 3;
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
p = makePair(101, Channel::kPose);
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
REQUIRE(count == 4);
}
TEST_CASE( "PacketManager out-of-order frames" ) {
PacketManager mgr;
int count = 0;
PacketPair p;
p = makePair(200, Channel::kColour);
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
p = makePair(201, Channel::kPose);
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
REQUIRE(count == 1);
p = makePair(200, Channel::kEndFrame);
p.second.packet_count = 2;
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
REQUIRE(count == 3);
}
TEST_CASE( "PacketManager many out-of-order frames" ) {
PacketManager mgr;
std::vector<int64_t> times;
PacketPair p;
p = makePair(300, Channel::kColour);
mgr.submit(p, [&times](const PacketPair &pp) {
times.push_back(pp.first.timestamp);
});
p = makePair(301, Channel::kPose);
mgr.submit(p, [&times](const PacketPair &pp) {
times.push_back(pp.first.timestamp);
});
p = makePair(302, Channel::kPose);
mgr.submit(p, [&times](const PacketPair &pp) {
times.push_back(pp.first.timestamp);
});
p = makePair(301, Channel::kDepth);
mgr.submit(p, [&times](const PacketPair &pp) {
times.push_back(pp.first.timestamp);
});
REQUIRE(times.size() == 1);
p = makePair(300, Channel::kEndFrame);
p.second.packet_count = 2;
mgr.submit(p, [&times](const PacketPair &pp) {
times.push_back(pp.first.timestamp);
});
REQUIRE(times.size() == 4);
p = makePair(301, Channel::kEndFrame);
p.second.packet_count = 3;
mgr.submit(p, [&times](const PacketPair &pp) {
times.push_back(pp.first.timestamp);
});
REQUIRE(times.size() == 6);
REQUIRE(times[0] == 300);
REQUIRE(times[1] == 300);
REQUIRE(times[2] == 301);
REQUIRE(times[3] == 301);
REQUIRE(times[4] == 301);
REQUIRE(times[5] == 302);
}
TEST_CASE( "Incomplete frames" ) {
PacketManager mgr;
int count = 0;
PacketPair p;
p = makePair(400, Channel::kEndFrame);
p.second.packet_count = 2;
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
p = makePair(401, Channel::kEndFrame);
p.second.packet_count = 1;
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
p = makePair(402, Channel::kEndFrame);
p.second.packet_count = 1;
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
REQUIRE(count == 1);
p = makePair(403, Channel::kEndFrame);
p.second.packet_count = 1;
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
p = makePair(404, Channel::kEndFrame);
p.second.packet_count = 1;
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
p = makePair(405, Channel::kEndFrame);
p.second.packet_count = 1;
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
p = makePair(406, Channel::kEndFrame);
p.second.packet_count = 1;
mgr.submit(p, [&count](const PacketPair &pp) {
++count;
});
REQUIRE(count == 7);
}
......@@ -127,7 +127,7 @@ TEST_CASE("TCP Stream", "[net]") {
ftl::protocol::DataPacket pkt;
pkt.bitrate = 10;
pkt.codec = ftl::protocol::Codec::kJPG;
pkt.frame_count = 1;
pkt.packet_count = 1;
for (int i=0; i<30 + 20; ++i) {
spkt.timestamp = i;
......@@ -142,6 +142,74 @@ TEST_CASE("TCP Stream", "[net]") {
REQUIRE( rcount == 30 );
}
SECTION("handles out-of-order packets") {
MUTEX mtx;
std::vector<int64_t> times;
times.reserve(24);
auto s1 = ftl::createStream("ftl://mystream");
REQUIRE( s1 );
auto s2 = self->getStream("ftl://mystream");
REQUIRE( s2 );
auto h = s2->onPacket([&mtx, &times](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) {
UNIQUE_LOCK(mtx, lk);
times.push_back(spkt.timestamp);
return true;
});
s1->begin();
s2->begin();
REQUIRE(s1->active(FrameID(0, 0)) == false);
s2->enable(FrameID(0, 0));
// TODO: Find better option
std::this_thread::sleep_for(std::chrono::milliseconds(10));
REQUIRE(s1->active(FrameID(0, 0)) == true);
ftl::protocol::StreamPacket spkt;
spkt.streamID = 0;
spkt.frame_number = 0;
spkt.channel = ftl::protocol::Channel::kEndFrame;
ftl::protocol::DataPacket pkt;
pkt.bitrate = 10;
pkt.codec = ftl::protocol::Codec::kJPG;
pkt.packet_count = 1;
spkt.timestamp = 100;
pkt.packet_count = 2;
s1->post(spkt, pkt);
pkt.packet_count = 1;
spkt.timestamp = 120;
s1->post(spkt, pkt);
spkt.timestamp = 110;
pkt.packet_count = 21;
s1->post(spkt, pkt);
spkt.channel = ftl::protocol::Channel::kColour;
for (int i=0; i < 20; ++i) {
spkt.timestamp = 110;
s1->post(spkt, pkt);
}
spkt.timestamp = 100;
s1->post(spkt, pkt);
// TODO: Find better option
int k = 15;
while (--k > 0 && times.size() < 24) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
REQUIRE( times.size() == 24 );
REQUIRE(times[0] == 100);
REQUIRE(times[1] == 100);
REQUIRE(times[2] == 110);
REQUIRE(times[23] == 120);
}
p.reset();
ftl::protocol::reset();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment